Merge pull request #5979 from influxdata/js-5974-aux-iterator-close-panic

Fix aux iterators to respect early closing
pull/5401/merge
Jonathan A. Sternberg 2016-03-14 12:03:50 -04:00
commit 3f68bd12ee
4 changed files with 377 additions and 74 deletions

View File

@ -572,9 +572,10 @@ func (itr *floatIntervalIterator) Next() *FloatPoint {
// floatAuxIterator represents a float implementation of AuxIterator.
type floatAuxIterator struct {
input *bufFloatIterator
output chan *FloatPoint
fields auxIteratorFields
input *bufFloatIterator
output chan *FloatPoint
fields auxIteratorFields
background bool
}
func newFloatAuxIterator(input FloatIterator, seriesKeys SeriesList, opt IteratorOptions) *floatAuxIterator {
@ -585,6 +586,12 @@ func newFloatAuxIterator(input FloatIterator, seriesKeys SeriesList, opt Iterato
}
}
func (itr *floatAuxIterator) Background() {
itr.background = true
itr.Start()
go drainIterator(itr)
}
func (itr *floatAuxIterator) Start() { go itr.stream() }
func (itr *floatAuxIterator) Close() error { return itr.input.Close() }
func (itr *floatAuxIterator) Next() *FloatPoint { return <-itr.output }
@ -622,7 +629,9 @@ func (itr *floatAuxIterator) stream() {
// Send point to output and to each field iterator.
itr.output <- p
itr.fields.send(p)
if ok := itr.fields.send(p); !ok && itr.background {
break
}
}
close(itr.output)
@ -631,16 +640,73 @@ func (itr *floatAuxIterator) stream() {
// floatChanIterator represents a new instance of floatChanIterator.
type floatChanIterator struct {
c chan *FloatPoint
once sync.Once
buf *FloatPoint
cond *sync.Cond
done bool
}
func (itr *floatChanIterator) Close() error {
itr.once.Do(func() { close(itr.c) })
itr.cond.L.Lock()
// Mark the channel iterator as done and signal all waiting goroutines to start again.
itr.done = true
itr.cond.Broadcast()
// Do not defer the unlock so we don't create an unnecessary allocation.
itr.cond.L.Unlock()
return nil
}
func (itr *floatChanIterator) Next() *FloatPoint { return <-itr.c }
func (itr *floatChanIterator) setBuf(name string, tags Tags, time int64, value interface{}) bool {
itr.cond.L.Lock()
defer itr.cond.L.Unlock()
// Wait for either the iterator to be done (so we don't have to set the value)
// or for the buffer to have been read and ready for another write.
for !itr.done && itr.buf != nil {
itr.cond.Wait()
}
// Do not set the value and return false to signal that the iterator is closed.
// Do this after the above wait as the above for loop may have exited because
// the iterator was closed.
if itr.done {
return false
}
switch v := value.(type) {
case float64:
itr.buf = &FloatPoint{Name: name, Tags: tags, Time: time, Value: v}
case int64:
itr.buf = &FloatPoint{Name: name, Tags: tags, Time: time, Value: float64(v)}
default:
itr.buf = &FloatPoint{Name: name, Tags: tags, Time: time, Nil: true}
}
// Signal to all waiting goroutines that a new value is ready to read.
itr.cond.Signal()
return true
}
func (itr *floatChanIterator) Next() *FloatPoint {
itr.cond.L.Lock()
// Wait until either a value is available in the buffer or
// the iterator is closed.
for !itr.done && itr.buf == nil {
itr.cond.Wait()
}
// Always read from the buffer if it exists, even if the iterator
// is closed. This prevents the last value from being truncated by
// the parent iterator.
p := itr.buf
itr.buf = nil
itr.cond.Signal()
// Do not defer the unlock so we don't create an unnecessary allocation.
itr.cond.L.Unlock()
return p
}
// floatReduceFloatIterator executes a reducer for every interval and buffers the result.
type floatReduceFloatIterator struct {
@ -1739,9 +1805,10 @@ func (itr *integerIntervalIterator) Next() *IntegerPoint {
// integerAuxIterator represents a integer implementation of AuxIterator.
type integerAuxIterator struct {
input *bufIntegerIterator
output chan *IntegerPoint
fields auxIteratorFields
input *bufIntegerIterator
output chan *IntegerPoint
fields auxIteratorFields
background bool
}
func newIntegerAuxIterator(input IntegerIterator, seriesKeys SeriesList, opt IteratorOptions) *integerAuxIterator {
@ -1752,6 +1819,12 @@ func newIntegerAuxIterator(input IntegerIterator, seriesKeys SeriesList, opt Ite
}
}
func (itr *integerAuxIterator) Background() {
itr.background = true
itr.Start()
go drainIterator(itr)
}
func (itr *integerAuxIterator) Start() { go itr.stream() }
func (itr *integerAuxIterator) Close() error { return itr.input.Close() }
func (itr *integerAuxIterator) Next() *IntegerPoint { return <-itr.output }
@ -1789,7 +1862,9 @@ func (itr *integerAuxIterator) stream() {
// Send point to output and to each field iterator.
itr.output <- p
itr.fields.send(p)
if ok := itr.fields.send(p); !ok && itr.background {
break
}
}
close(itr.output)
@ -1798,16 +1873,70 @@ func (itr *integerAuxIterator) stream() {
// integerChanIterator represents a new instance of integerChanIterator.
type integerChanIterator struct {
c chan *IntegerPoint
once sync.Once
buf *IntegerPoint
cond *sync.Cond
done bool
}
func (itr *integerChanIterator) Close() error {
itr.once.Do(func() { close(itr.c) })
itr.cond.L.Lock()
// Mark the channel iterator as done and signal all waiting goroutines to start again.
itr.done = true
itr.cond.Broadcast()
// Do not defer the unlock so we don't create an unnecessary allocation.
itr.cond.L.Unlock()
return nil
}
func (itr *integerChanIterator) Next() *IntegerPoint { return <-itr.c }
func (itr *integerChanIterator) setBuf(name string, tags Tags, time int64, value interface{}) bool {
itr.cond.L.Lock()
defer itr.cond.L.Unlock()
// Wait for either the iterator to be done (so we don't have to set the value)
// or for the buffer to have been read and ready for another write.
for !itr.done && itr.buf != nil {
itr.cond.Wait()
}
// Do not set the value and return false to signal that the iterator is closed.
// Do this after the above wait as the above for loop may have exited because
// the iterator was closed.
if itr.done {
return false
}
switch v := value.(type) {
case int64:
itr.buf = &IntegerPoint{Name: name, Tags: tags, Time: time, Value: v}
default:
itr.buf = &IntegerPoint{Name: name, Tags: tags, Time: time, Nil: true}
}
// Signal to all waiting goroutines that a new value is ready to read.
itr.cond.Signal()
return true
}
func (itr *integerChanIterator) Next() *IntegerPoint {
itr.cond.L.Lock()
// Wait until either a value is available in the buffer or
// the iterator is closed.
for !itr.done && itr.buf == nil {
itr.cond.Wait()
}
// Always read from the buffer if it exists, even if the iterator
// is closed. This prevents the last value from being truncated by
// the parent iterator.
p := itr.buf
itr.buf = nil
itr.cond.Signal()
// Do not defer the unlock so we don't create an unnecessary allocation.
itr.cond.L.Unlock()
return p
}
// integerReduceFloatIterator executes a reducer for every interval and buffers the result.
type integerReduceFloatIterator struct {
@ -2906,9 +3035,10 @@ func (itr *stringIntervalIterator) Next() *StringPoint {
// stringAuxIterator represents a string implementation of AuxIterator.
type stringAuxIterator struct {
input *bufStringIterator
output chan *StringPoint
fields auxIteratorFields
input *bufStringIterator
output chan *StringPoint
fields auxIteratorFields
background bool
}
func newStringAuxIterator(input StringIterator, seriesKeys SeriesList, opt IteratorOptions) *stringAuxIterator {
@ -2919,6 +3049,12 @@ func newStringAuxIterator(input StringIterator, seriesKeys SeriesList, opt Itera
}
}
func (itr *stringAuxIterator) Background() {
itr.background = true
itr.Start()
go drainIterator(itr)
}
func (itr *stringAuxIterator) Start() { go itr.stream() }
func (itr *stringAuxIterator) Close() error { return itr.input.Close() }
func (itr *stringAuxIterator) Next() *StringPoint { return <-itr.output }
@ -2956,7 +3092,9 @@ func (itr *stringAuxIterator) stream() {
// Send point to output and to each field iterator.
itr.output <- p
itr.fields.send(p)
if ok := itr.fields.send(p); !ok && itr.background {
break
}
}
close(itr.output)
@ -2965,16 +3103,70 @@ func (itr *stringAuxIterator) stream() {
// stringChanIterator represents a new instance of stringChanIterator.
type stringChanIterator struct {
c chan *StringPoint
once sync.Once
buf *StringPoint
cond *sync.Cond
done bool
}
func (itr *stringChanIterator) Close() error {
itr.once.Do(func() { close(itr.c) })
itr.cond.L.Lock()
// Mark the channel iterator as done and signal all waiting goroutines to start again.
itr.done = true
itr.cond.Broadcast()
// Do not defer the unlock so we don't create an unnecessary allocation.
itr.cond.L.Unlock()
return nil
}
func (itr *stringChanIterator) Next() *StringPoint { return <-itr.c }
func (itr *stringChanIterator) setBuf(name string, tags Tags, time int64, value interface{}) bool {
itr.cond.L.Lock()
defer itr.cond.L.Unlock()
// Wait for either the iterator to be done (so we don't have to set the value)
// or for the buffer to have been read and ready for another write.
for !itr.done && itr.buf != nil {
itr.cond.Wait()
}
// Do not set the value and return false to signal that the iterator is closed.
// Do this after the above wait as the above for loop may have exited because
// the iterator was closed.
if itr.done {
return false
}
switch v := value.(type) {
case string:
itr.buf = &StringPoint{Name: name, Tags: tags, Time: time, Value: v}
default:
itr.buf = &StringPoint{Name: name, Tags: tags, Time: time, Nil: true}
}
// Signal to all waiting goroutines that a new value is ready to read.
itr.cond.Signal()
return true
}
func (itr *stringChanIterator) Next() *StringPoint {
itr.cond.L.Lock()
// Wait until either a value is available in the buffer or
// the iterator is closed.
for !itr.done && itr.buf == nil {
itr.cond.Wait()
}
// Always read from the buffer if it exists, even if the iterator
// is closed. This prevents the last value from being truncated by
// the parent iterator.
p := itr.buf
itr.buf = nil
itr.cond.Signal()
// Do not defer the unlock so we don't create an unnecessary allocation.
itr.cond.L.Unlock()
return p
}
// stringReduceFloatIterator executes a reducer for every interval and buffers the result.
type stringReduceFloatIterator struct {
@ -4073,9 +4265,10 @@ func (itr *booleanIntervalIterator) Next() *BooleanPoint {
// booleanAuxIterator represents a boolean implementation of AuxIterator.
type booleanAuxIterator struct {
input *bufBooleanIterator
output chan *BooleanPoint
fields auxIteratorFields
input *bufBooleanIterator
output chan *BooleanPoint
fields auxIteratorFields
background bool
}
func newBooleanAuxIterator(input BooleanIterator, seriesKeys SeriesList, opt IteratorOptions) *booleanAuxIterator {
@ -4086,6 +4279,12 @@ func newBooleanAuxIterator(input BooleanIterator, seriesKeys SeriesList, opt Ite
}
}
func (itr *booleanAuxIterator) Background() {
itr.background = true
itr.Start()
go drainIterator(itr)
}
func (itr *booleanAuxIterator) Start() { go itr.stream() }
func (itr *booleanAuxIterator) Close() error { return itr.input.Close() }
func (itr *booleanAuxIterator) Next() *BooleanPoint { return <-itr.output }
@ -4123,7 +4322,9 @@ func (itr *booleanAuxIterator) stream() {
// Send point to output and to each field iterator.
itr.output <- p
itr.fields.send(p)
if ok := itr.fields.send(p); !ok && itr.background {
break
}
}
close(itr.output)
@ -4132,16 +4333,70 @@ func (itr *booleanAuxIterator) stream() {
// booleanChanIterator represents a new instance of booleanChanIterator.
type booleanChanIterator struct {
c chan *BooleanPoint
once sync.Once
buf *BooleanPoint
cond *sync.Cond
done bool
}
func (itr *booleanChanIterator) Close() error {
itr.once.Do(func() { close(itr.c) })
itr.cond.L.Lock()
// Mark the channel iterator as done and signal all waiting goroutines to start again.
itr.done = true
itr.cond.Broadcast()
// Do not defer the unlock so we don't create an unnecessary allocation.
itr.cond.L.Unlock()
return nil
}
func (itr *booleanChanIterator) Next() *BooleanPoint { return <-itr.c }
func (itr *booleanChanIterator) setBuf(name string, tags Tags, time int64, value interface{}) bool {
itr.cond.L.Lock()
defer itr.cond.L.Unlock()
// Wait for either the iterator to be done (so we don't have to set the value)
// or for the buffer to have been read and ready for another write.
for !itr.done && itr.buf != nil {
itr.cond.Wait()
}
// Do not set the value and return false to signal that the iterator is closed.
// Do this after the above wait as the above for loop may have exited because
// the iterator was closed.
if itr.done {
return false
}
switch v := value.(type) {
case bool:
itr.buf = &BooleanPoint{Name: name, Tags: tags, Time: time, Value: v}
default:
itr.buf = &BooleanPoint{Name: name, Tags: tags, Time: time, Nil: true}
}
// Signal to all waiting goroutines that a new value is ready to read.
itr.cond.Signal()
return true
}
func (itr *booleanChanIterator) Next() *BooleanPoint {
itr.cond.L.Lock()
// Wait until either a value is available in the buffer or
// the iterator is closed.
for !itr.done && itr.buf == nil {
itr.cond.Wait()
}
// Always read from the buffer if it exists, even if the iterator
// is closed. This prevents the last value from being truncated by
// the parent iterator.
p := itr.buf
itr.buf = nil
itr.cond.Signal()
// Do not defer the unlock so we don't create an unnecessary allocation.
itr.cond.L.Unlock()
return p
}
// booleanReduceFloatIterator executes a reducer for every interval and buffers the result.
type booleanReduceFloatIterator struct {

View File

@ -571,9 +571,10 @@ func (itr *{{$k.name}}IntervalIterator) Next() *{{$k.Name}}Point {
// {{$k.name}}AuxIterator represents a {{$k.name}} implementation of AuxIterator.
type {{$k.name}}AuxIterator struct {
input *buf{{$k.Name}}Iterator
output chan *{{$k.Name}}Point
fields auxIteratorFields
input *buf{{$k.Name}}Iterator
output chan *{{$k.Name}}Point
fields auxIteratorFields
background bool
}
func new{{$k.Name}}AuxIterator(input {{$k.Name}}Iterator, seriesKeys SeriesList, opt IteratorOptions) *{{$k.name}}AuxIterator {
@ -584,6 +585,12 @@ func new{{$k.Name}}AuxIterator(input {{$k.Name}}Iterator, seriesKeys SeriesList,
}
}
func (itr *{{$k.name}}AuxIterator) Background() {
itr.background = true
itr.Start()
go drainIterator(itr)
}
func (itr *{{$k.name}}AuxIterator) Start() { go itr.stream() }
func (itr *{{$k.name}}AuxIterator) Close() error { return itr.input.Close() }
func (itr *{{$k.name}}AuxIterator) Next() *{{$k.Name}}Point { return <-itr.output }
@ -621,7 +628,9 @@ func (itr *{{$k.name}}AuxIterator) stream() {
// Send point to output and to each field iterator.
itr.output <- p
itr.fields.send(p)
if ok := itr.fields.send(p); !ok && itr.background {
break
}
}
close(itr.output)
@ -630,16 +639,73 @@ func (itr *{{$k.name}}AuxIterator) stream() {
// {{$k.name}}ChanIterator represents a new instance of {{$k.name}}ChanIterator.
type {{$k.name}}ChanIterator struct {
c chan *{{$k.Name}}Point
once sync.Once
buf *{{$k.Name}}Point
cond *sync.Cond
done bool
}
func (itr *{{$k.name}}ChanIterator) Close() error {
itr.once.Do(func() { close(itr.c) })
itr.cond.L.Lock()
// Mark the channel iterator as done and signal all waiting goroutines to start again.
itr.done = true
itr.cond.Broadcast()
// Do not defer the unlock so we don't create an unnecessary allocation.
itr.cond.L.Unlock()
return nil
}
func (itr *{{$k.name}}ChanIterator) Next() *{{$k.Name}}Point { return <-itr.c }
func (itr *{{$k.name}}ChanIterator) setBuf(name string, tags Tags, time int64, value interface{}) bool {
itr.cond.L.Lock()
defer itr.cond.L.Unlock()
// Wait for either the iterator to be done (so we don't have to set the value)
// or for the buffer to have been read and ready for another write.
for !itr.done && itr.buf != nil {
itr.cond.Wait()
}
// Do not set the value and return false to signal that the iterator is closed.
// Do this after the above wait as the above for loop may have exited because
// the iterator was closed.
if itr.done {
return false
}
switch v := value.(type) {
case {{$k.Type}}:
itr.buf = &{{$k.Name}}Point{Name: name, Tags: tags, Time: time, Value: v}
{{if eq $k.Name "Float"}}
case int64:
itr.buf = &{{$k.Name}}Point{Name: name, Tags: tags, Time: time, Value: float64(v)}
{{end}}
default:
itr.buf = &{{$k.Name}}Point{Name: name, Tags: tags, Time: time, Nil: true}
}
// Signal to all waiting goroutines that a new value is ready to read.
itr.cond.Signal()
return true
}
func (itr *{{$k.name}}ChanIterator) Next() *{{$k.Name}}Point {
itr.cond.L.Lock()
// Wait until either a value is available in the buffer or
// the iterator is closed.
for !itr.done && itr.buf == nil {
itr.cond.Wait()
}
// Always read from the buffer if it exists, even if the iterator
// is closed. This prevents the last value from being truncated by
// the parent iterator.
p := itr.buf
itr.buf = nil
itr.cond.Signal()
// Do not defer the unlock so we don't create an unnecessary allocation.
itr.cond.L.Unlock()
return p
}
{{range $v := $types}}

View File

@ -232,6 +232,10 @@ type AuxIterator interface {
// Start starts writing to the created iterators.
Start()
// Backgrounds the iterator so that, when start is called, it will
// continuously read from the iterator.
Background()
}
// NewAuxIterator returns a new instance of AuxIterator.
@ -312,19 +316,19 @@ func (a auxIteratorFields) iterator(name string) Iterator {
// Create channel iterator by data type.
switch f.typ {
case Float:
itr := &floatChanIterator{c: make(chan *FloatPoint, 1)}
itr := &floatChanIterator{cond: sync.NewCond(&sync.Mutex{})}
f.append(itr)
return itr
case Integer:
itr := &integerChanIterator{c: make(chan *IntegerPoint, 1)}
itr := &integerChanIterator{cond: sync.NewCond(&sync.Mutex{})}
f.append(itr)
return itr
case String:
itr := &stringChanIterator{c: make(chan *StringPoint, 1)}
itr := &stringChanIterator{cond: sync.NewCond(&sync.Mutex{})}
f.append(itr)
return itr
case Boolean:
itr := &booleanChanIterator{c: make(chan *BooleanPoint, 1)}
itr := &booleanChanIterator{cond: sync.NewCond(&sync.Mutex{})}
f.append(itr)
return itr
default:
@ -336,7 +340,7 @@ func (a auxIteratorFields) iterator(name string) Iterator {
}
// send sends a point to all field iterators.
func (a auxIteratorFields) send(p Point) {
func (a auxIteratorFields) send(p Point) (ok bool) {
values := p.aux()
for i, f := range a {
v := values[i]
@ -349,40 +353,19 @@ func (a auxIteratorFields) send(p Point) {
for _, itr := range f.itrs {
switch itr := itr.(type) {
case *floatChanIterator:
switch v := v.(type) {
case float64:
itr.c <- &FloatPoint{Name: p.name(), Tags: tags, Time: p.time(), Value: v}
case int64:
itr.c <- &FloatPoint{Name: p.name(), Tags: tags, Time: p.time(), Value: float64(v)}
default:
itr.c <- &FloatPoint{Name: p.name(), Tags: tags, Time: p.time(), Nil: true}
}
ok = itr.setBuf(p.name(), tags, p.time(), v) || ok
case *integerChanIterator:
switch v := v.(type) {
case int64:
itr.c <- &IntegerPoint{Name: p.name(), Tags: tags, Time: p.time(), Value: v}
default:
itr.c <- &IntegerPoint{Name: p.name(), Tags: tags, Time: p.time(), Nil: true}
}
ok = itr.setBuf(p.name(), tags, p.time(), v) || ok
case *stringChanIterator:
switch v := v.(type) {
case string:
itr.c <- &StringPoint{Name: p.name(), Tags: tags, Time: p.time(), Value: v}
default:
itr.c <- &StringPoint{Name: p.name(), Tags: tags, Time: p.time(), Nil: true}
}
ok = itr.setBuf(p.name(), tags, p.time(), v) || ok
case *booleanChanIterator:
switch v := v.(type) {
case bool:
itr.c <- &BooleanPoint{Name: p.name(), Tags: tags, Time: p.time(), Value: v}
default:
itr.c <- &BooleanPoint{Name: p.name(), Tags: tags, Time: p.time(), Nil: true}
}
ok = itr.setBuf(p.name(), tags, p.time(), v) || ok
default:
panic(fmt.Sprintf("invalid aux itr type: %T", itr))
}
}
}
return ok
}
// drainIterator reads all points from an iterator.

View File

@ -121,10 +121,9 @@ func buildAuxIterators(fields Fields, ic IteratorCreator, opt IteratorOptions) (
panic("unreachable")
}
}
aitr.Start()
// Drain primary aux iterator since there is no reader for it.
go drainIterator(aitr)
// Background the primary iterator since there is no reader for it.
aitr.Background()
return itrs, nil
}