diff --git a/influxql/iterator.gen.go b/influxql/iterator.gen.go index 09095fcd5e..cbae04d579 100644 --- a/influxql/iterator.gen.go +++ b/influxql/iterator.gen.go @@ -572,9 +572,10 @@ func (itr *floatIntervalIterator) Next() *FloatPoint { // floatAuxIterator represents a float implementation of AuxIterator. type floatAuxIterator struct { - input *bufFloatIterator - output chan *FloatPoint - fields auxIteratorFields + input *bufFloatIterator + output chan *FloatPoint + fields auxIteratorFields + background bool } func newFloatAuxIterator(input FloatIterator, seriesKeys SeriesList, opt IteratorOptions) *floatAuxIterator { @@ -585,6 +586,12 @@ func newFloatAuxIterator(input FloatIterator, seriesKeys SeriesList, opt Iterato } } +func (itr *floatAuxIterator) Background() { + itr.background = true + itr.Start() + go drainIterator(itr) +} + func (itr *floatAuxIterator) Start() { go itr.stream() } func (itr *floatAuxIterator) Close() error { return itr.input.Close() } func (itr *floatAuxIterator) Next() *FloatPoint { return <-itr.output } @@ -622,7 +629,9 @@ func (itr *floatAuxIterator) stream() { // Send point to output and to each field iterator. itr.output <- p - itr.fields.send(p) + if ok := itr.fields.send(p); !ok && itr.background { + break + } } close(itr.output) @@ -631,16 +640,73 @@ func (itr *floatAuxIterator) stream() { // floatChanIterator represents a new instance of floatChanIterator. type floatChanIterator struct { - c chan *FloatPoint - once sync.Once + buf *FloatPoint + cond *sync.Cond + done bool } func (itr *floatChanIterator) Close() error { - itr.once.Do(func() { close(itr.c) }) + itr.cond.L.Lock() + // Mark the channel iterator as done and signal all waiting goroutines to start again. + itr.done = true + itr.cond.Broadcast() + // Do not defer the unlock so we don't create an unnecessary allocation. + itr.cond.L.Unlock() return nil } -func (itr *floatChanIterator) Next() *FloatPoint { return <-itr.c } +func (itr *floatChanIterator) setBuf(name string, tags Tags, time int64, value interface{}) bool { + itr.cond.L.Lock() + defer itr.cond.L.Unlock() + + // Wait for either the iterator to be done (so we don't have to set the value) + // or for the buffer to have been read and ready for another write. + for !itr.done && itr.buf != nil { + itr.cond.Wait() + } + + // Do not set the value and return false to signal that the iterator is closed. + // Do this after the above wait as the above for loop may have exited because + // the iterator was closed. + if itr.done { + return false + } + + switch v := value.(type) { + case float64: + itr.buf = &FloatPoint{Name: name, Tags: tags, Time: time, Value: v} + + case int64: + itr.buf = &FloatPoint{Name: name, Tags: tags, Time: time, Value: float64(v)} + + default: + itr.buf = &FloatPoint{Name: name, Tags: tags, Time: time, Nil: true} + } + // Signal to all waiting goroutines that a new value is ready to read. + itr.cond.Signal() + return true +} + +func (itr *floatChanIterator) Next() *FloatPoint { + itr.cond.L.Lock() + + // Wait until either a value is available in the buffer or + // the iterator is closed. + for !itr.done && itr.buf == nil { + itr.cond.Wait() + } + + // Always read from the buffer if it exists, even if the iterator + // is closed. This prevents the last value from being truncated by + // the parent iterator. + p := itr.buf + itr.buf = nil + itr.cond.Signal() + + // Do not defer the unlock so we don't create an unnecessary allocation. + itr.cond.L.Unlock() + return p +} // floatReduceFloatIterator executes a reducer for every interval and buffers the result. type floatReduceFloatIterator struct { @@ -1739,9 +1805,10 @@ func (itr *integerIntervalIterator) Next() *IntegerPoint { // integerAuxIterator represents a integer implementation of AuxIterator. type integerAuxIterator struct { - input *bufIntegerIterator - output chan *IntegerPoint - fields auxIteratorFields + input *bufIntegerIterator + output chan *IntegerPoint + fields auxIteratorFields + background bool } func newIntegerAuxIterator(input IntegerIterator, seriesKeys SeriesList, opt IteratorOptions) *integerAuxIterator { @@ -1752,6 +1819,12 @@ func newIntegerAuxIterator(input IntegerIterator, seriesKeys SeriesList, opt Ite } } +func (itr *integerAuxIterator) Background() { + itr.background = true + itr.Start() + go drainIterator(itr) +} + func (itr *integerAuxIterator) Start() { go itr.stream() } func (itr *integerAuxIterator) Close() error { return itr.input.Close() } func (itr *integerAuxIterator) Next() *IntegerPoint { return <-itr.output } @@ -1789,7 +1862,9 @@ func (itr *integerAuxIterator) stream() { // Send point to output and to each field iterator. itr.output <- p - itr.fields.send(p) + if ok := itr.fields.send(p); !ok && itr.background { + break + } } close(itr.output) @@ -1798,16 +1873,70 @@ func (itr *integerAuxIterator) stream() { // integerChanIterator represents a new instance of integerChanIterator. type integerChanIterator struct { - c chan *IntegerPoint - once sync.Once + buf *IntegerPoint + cond *sync.Cond + done bool } func (itr *integerChanIterator) Close() error { - itr.once.Do(func() { close(itr.c) }) + itr.cond.L.Lock() + // Mark the channel iterator as done and signal all waiting goroutines to start again. + itr.done = true + itr.cond.Broadcast() + // Do not defer the unlock so we don't create an unnecessary allocation. + itr.cond.L.Unlock() return nil } -func (itr *integerChanIterator) Next() *IntegerPoint { return <-itr.c } +func (itr *integerChanIterator) setBuf(name string, tags Tags, time int64, value interface{}) bool { + itr.cond.L.Lock() + defer itr.cond.L.Unlock() + + // Wait for either the iterator to be done (so we don't have to set the value) + // or for the buffer to have been read and ready for another write. + for !itr.done && itr.buf != nil { + itr.cond.Wait() + } + + // Do not set the value and return false to signal that the iterator is closed. + // Do this after the above wait as the above for loop may have exited because + // the iterator was closed. + if itr.done { + return false + } + + switch v := value.(type) { + case int64: + itr.buf = &IntegerPoint{Name: name, Tags: tags, Time: time, Value: v} + + default: + itr.buf = &IntegerPoint{Name: name, Tags: tags, Time: time, Nil: true} + } + // Signal to all waiting goroutines that a new value is ready to read. + itr.cond.Signal() + return true +} + +func (itr *integerChanIterator) Next() *IntegerPoint { + itr.cond.L.Lock() + + // Wait until either a value is available in the buffer or + // the iterator is closed. + for !itr.done && itr.buf == nil { + itr.cond.Wait() + } + + // Always read from the buffer if it exists, even if the iterator + // is closed. This prevents the last value from being truncated by + // the parent iterator. + p := itr.buf + itr.buf = nil + itr.cond.Signal() + + // Do not defer the unlock so we don't create an unnecessary allocation. + itr.cond.L.Unlock() + return p +} // integerReduceFloatIterator executes a reducer for every interval and buffers the result. type integerReduceFloatIterator struct { @@ -2906,9 +3035,10 @@ func (itr *stringIntervalIterator) Next() *StringPoint { // stringAuxIterator represents a string implementation of AuxIterator. type stringAuxIterator struct { - input *bufStringIterator - output chan *StringPoint - fields auxIteratorFields + input *bufStringIterator + output chan *StringPoint + fields auxIteratorFields + background bool } func newStringAuxIterator(input StringIterator, seriesKeys SeriesList, opt IteratorOptions) *stringAuxIterator { @@ -2919,6 +3049,12 @@ func newStringAuxIterator(input StringIterator, seriesKeys SeriesList, opt Itera } } +func (itr *stringAuxIterator) Background() { + itr.background = true + itr.Start() + go drainIterator(itr) +} + func (itr *stringAuxIterator) Start() { go itr.stream() } func (itr *stringAuxIterator) Close() error { return itr.input.Close() } func (itr *stringAuxIterator) Next() *StringPoint { return <-itr.output } @@ -2956,7 +3092,9 @@ func (itr *stringAuxIterator) stream() { // Send point to output and to each field iterator. itr.output <- p - itr.fields.send(p) + if ok := itr.fields.send(p); !ok && itr.background { + break + } } close(itr.output) @@ -2965,16 +3103,70 @@ func (itr *stringAuxIterator) stream() { // stringChanIterator represents a new instance of stringChanIterator. type stringChanIterator struct { - c chan *StringPoint - once sync.Once + buf *StringPoint + cond *sync.Cond + done bool } func (itr *stringChanIterator) Close() error { - itr.once.Do(func() { close(itr.c) }) + itr.cond.L.Lock() + // Mark the channel iterator as done and signal all waiting goroutines to start again. + itr.done = true + itr.cond.Broadcast() + // Do not defer the unlock so we don't create an unnecessary allocation. + itr.cond.L.Unlock() return nil } -func (itr *stringChanIterator) Next() *StringPoint { return <-itr.c } +func (itr *stringChanIterator) setBuf(name string, tags Tags, time int64, value interface{}) bool { + itr.cond.L.Lock() + defer itr.cond.L.Unlock() + + // Wait for either the iterator to be done (so we don't have to set the value) + // or for the buffer to have been read and ready for another write. + for !itr.done && itr.buf != nil { + itr.cond.Wait() + } + + // Do not set the value and return false to signal that the iterator is closed. + // Do this after the above wait as the above for loop may have exited because + // the iterator was closed. + if itr.done { + return false + } + + switch v := value.(type) { + case string: + itr.buf = &StringPoint{Name: name, Tags: tags, Time: time, Value: v} + + default: + itr.buf = &StringPoint{Name: name, Tags: tags, Time: time, Nil: true} + } + // Signal to all waiting goroutines that a new value is ready to read. + itr.cond.Signal() + return true +} + +func (itr *stringChanIterator) Next() *StringPoint { + itr.cond.L.Lock() + + // Wait until either a value is available in the buffer or + // the iterator is closed. + for !itr.done && itr.buf == nil { + itr.cond.Wait() + } + + // Always read from the buffer if it exists, even if the iterator + // is closed. This prevents the last value from being truncated by + // the parent iterator. + p := itr.buf + itr.buf = nil + itr.cond.Signal() + + // Do not defer the unlock so we don't create an unnecessary allocation. + itr.cond.L.Unlock() + return p +} // stringReduceFloatIterator executes a reducer for every interval and buffers the result. type stringReduceFloatIterator struct { @@ -4073,9 +4265,10 @@ func (itr *booleanIntervalIterator) Next() *BooleanPoint { // booleanAuxIterator represents a boolean implementation of AuxIterator. type booleanAuxIterator struct { - input *bufBooleanIterator - output chan *BooleanPoint - fields auxIteratorFields + input *bufBooleanIterator + output chan *BooleanPoint + fields auxIteratorFields + background bool } func newBooleanAuxIterator(input BooleanIterator, seriesKeys SeriesList, opt IteratorOptions) *booleanAuxIterator { @@ -4086,6 +4279,12 @@ func newBooleanAuxIterator(input BooleanIterator, seriesKeys SeriesList, opt Ite } } +func (itr *booleanAuxIterator) Background() { + itr.background = true + itr.Start() + go drainIterator(itr) +} + func (itr *booleanAuxIterator) Start() { go itr.stream() } func (itr *booleanAuxIterator) Close() error { return itr.input.Close() } func (itr *booleanAuxIterator) Next() *BooleanPoint { return <-itr.output } @@ -4123,7 +4322,9 @@ func (itr *booleanAuxIterator) stream() { // Send point to output and to each field iterator. itr.output <- p - itr.fields.send(p) + if ok := itr.fields.send(p); !ok && itr.background { + break + } } close(itr.output) @@ -4132,16 +4333,70 @@ func (itr *booleanAuxIterator) stream() { // booleanChanIterator represents a new instance of booleanChanIterator. type booleanChanIterator struct { - c chan *BooleanPoint - once sync.Once + buf *BooleanPoint + cond *sync.Cond + done bool } func (itr *booleanChanIterator) Close() error { - itr.once.Do(func() { close(itr.c) }) + itr.cond.L.Lock() + // Mark the channel iterator as done and signal all waiting goroutines to start again. + itr.done = true + itr.cond.Broadcast() + // Do not defer the unlock so we don't create an unnecessary allocation. + itr.cond.L.Unlock() return nil } -func (itr *booleanChanIterator) Next() *BooleanPoint { return <-itr.c } +func (itr *booleanChanIterator) setBuf(name string, tags Tags, time int64, value interface{}) bool { + itr.cond.L.Lock() + defer itr.cond.L.Unlock() + + // Wait for either the iterator to be done (so we don't have to set the value) + // or for the buffer to have been read and ready for another write. + for !itr.done && itr.buf != nil { + itr.cond.Wait() + } + + // Do not set the value and return false to signal that the iterator is closed. + // Do this after the above wait as the above for loop may have exited because + // the iterator was closed. + if itr.done { + return false + } + + switch v := value.(type) { + case bool: + itr.buf = &BooleanPoint{Name: name, Tags: tags, Time: time, Value: v} + + default: + itr.buf = &BooleanPoint{Name: name, Tags: tags, Time: time, Nil: true} + } + // Signal to all waiting goroutines that a new value is ready to read. + itr.cond.Signal() + return true +} + +func (itr *booleanChanIterator) Next() *BooleanPoint { + itr.cond.L.Lock() + + // Wait until either a value is available in the buffer or + // the iterator is closed. + for !itr.done && itr.buf == nil { + itr.cond.Wait() + } + + // Always read from the buffer if it exists, even if the iterator + // is closed. This prevents the last value from being truncated by + // the parent iterator. + p := itr.buf + itr.buf = nil + itr.cond.Signal() + + // Do not defer the unlock so we don't create an unnecessary allocation. + itr.cond.L.Unlock() + return p +} // booleanReduceFloatIterator executes a reducer for every interval and buffers the result. type booleanReduceFloatIterator struct { diff --git a/influxql/iterator.gen.go.tmpl b/influxql/iterator.gen.go.tmpl index 69f255b579..ad00a8634b 100644 --- a/influxql/iterator.gen.go.tmpl +++ b/influxql/iterator.gen.go.tmpl @@ -571,9 +571,10 @@ func (itr *{{$k.name}}IntervalIterator) Next() *{{$k.Name}}Point { // {{$k.name}}AuxIterator represents a {{$k.name}} implementation of AuxIterator. type {{$k.name}}AuxIterator struct { - input *buf{{$k.Name}}Iterator - output chan *{{$k.Name}}Point - fields auxIteratorFields + input *buf{{$k.Name}}Iterator + output chan *{{$k.Name}}Point + fields auxIteratorFields + background bool } func new{{$k.Name}}AuxIterator(input {{$k.Name}}Iterator, seriesKeys SeriesList, opt IteratorOptions) *{{$k.name}}AuxIterator { @@ -584,6 +585,12 @@ func new{{$k.Name}}AuxIterator(input {{$k.Name}}Iterator, seriesKeys SeriesList, } } +func (itr *{{$k.name}}AuxIterator) Background() { + itr.background = true + itr.Start() + go drainIterator(itr) +} + func (itr *{{$k.name}}AuxIterator) Start() { go itr.stream() } func (itr *{{$k.name}}AuxIterator) Close() error { return itr.input.Close() } func (itr *{{$k.name}}AuxIterator) Next() *{{$k.Name}}Point { return <-itr.output } @@ -621,7 +628,9 @@ func (itr *{{$k.name}}AuxIterator) stream() { // Send point to output and to each field iterator. itr.output <- p - itr.fields.send(p) + if ok := itr.fields.send(p); !ok && itr.background { + break + } } close(itr.output) @@ -630,16 +639,73 @@ func (itr *{{$k.name}}AuxIterator) stream() { // {{$k.name}}ChanIterator represents a new instance of {{$k.name}}ChanIterator. type {{$k.name}}ChanIterator struct { - c chan *{{$k.Name}}Point - once sync.Once + buf *{{$k.Name}}Point + cond *sync.Cond + done bool } func (itr *{{$k.name}}ChanIterator) Close() error { - itr.once.Do(func() { close(itr.c) }) + itr.cond.L.Lock() + // Mark the channel iterator as done and signal all waiting goroutines to start again. + itr.done = true + itr.cond.Broadcast() + // Do not defer the unlock so we don't create an unnecessary allocation. + itr.cond.L.Unlock() return nil } -func (itr *{{$k.name}}ChanIterator) Next() *{{$k.Name}}Point { return <-itr.c } +func (itr *{{$k.name}}ChanIterator) setBuf(name string, tags Tags, time int64, value interface{}) bool { + itr.cond.L.Lock() + defer itr.cond.L.Unlock() + + // Wait for either the iterator to be done (so we don't have to set the value) + // or for the buffer to have been read and ready for another write. + for !itr.done && itr.buf != nil { + itr.cond.Wait() + } + + // Do not set the value and return false to signal that the iterator is closed. + // Do this after the above wait as the above for loop may have exited because + // the iterator was closed. + if itr.done { + return false + } + + switch v := value.(type) { + case {{$k.Type}}: + itr.buf = &{{$k.Name}}Point{Name: name, Tags: tags, Time: time, Value: v} +{{if eq $k.Name "Float"}} + case int64: + itr.buf = &{{$k.Name}}Point{Name: name, Tags: tags, Time: time, Value: float64(v)} +{{end}} + default: + itr.buf = &{{$k.Name}}Point{Name: name, Tags: tags, Time: time, Nil: true} + } + // Signal to all waiting goroutines that a new value is ready to read. + itr.cond.Signal() + return true +} + +func (itr *{{$k.name}}ChanIterator) Next() *{{$k.Name}}Point { + itr.cond.L.Lock() + + // Wait until either a value is available in the buffer or + // the iterator is closed. + for !itr.done && itr.buf == nil { + itr.cond.Wait() + } + + // Always read from the buffer if it exists, even if the iterator + // is closed. This prevents the last value from being truncated by + // the parent iterator. + p := itr.buf + itr.buf = nil + itr.cond.Signal() + + // Do not defer the unlock so we don't create an unnecessary allocation. + itr.cond.L.Unlock() + return p +} {{range $v := $types}} diff --git a/influxql/iterator.go b/influxql/iterator.go index cee4cc3ae6..d35b81f99a 100644 --- a/influxql/iterator.go +++ b/influxql/iterator.go @@ -232,6 +232,10 @@ type AuxIterator interface { // Start starts writing to the created iterators. Start() + + // Backgrounds the iterator so that, when start is called, it will + // continuously read from the iterator. + Background() } // NewAuxIterator returns a new instance of AuxIterator. @@ -312,19 +316,19 @@ func (a auxIteratorFields) iterator(name string) Iterator { // Create channel iterator by data type. switch f.typ { case Float: - itr := &floatChanIterator{c: make(chan *FloatPoint, 1)} + itr := &floatChanIterator{cond: sync.NewCond(&sync.Mutex{})} f.append(itr) return itr case Integer: - itr := &integerChanIterator{c: make(chan *IntegerPoint, 1)} + itr := &integerChanIterator{cond: sync.NewCond(&sync.Mutex{})} f.append(itr) return itr case String: - itr := &stringChanIterator{c: make(chan *StringPoint, 1)} + itr := &stringChanIterator{cond: sync.NewCond(&sync.Mutex{})} f.append(itr) return itr case Boolean: - itr := &booleanChanIterator{c: make(chan *BooleanPoint, 1)} + itr := &booleanChanIterator{cond: sync.NewCond(&sync.Mutex{})} f.append(itr) return itr default: @@ -336,7 +340,7 @@ func (a auxIteratorFields) iterator(name string) Iterator { } // send sends a point to all field iterators. -func (a auxIteratorFields) send(p Point) { +func (a auxIteratorFields) send(p Point) (ok bool) { values := p.aux() for i, f := range a { v := values[i] @@ -349,40 +353,19 @@ func (a auxIteratorFields) send(p Point) { for _, itr := range f.itrs { switch itr := itr.(type) { case *floatChanIterator: - switch v := v.(type) { - case float64: - itr.c <- &FloatPoint{Name: p.name(), Tags: tags, Time: p.time(), Value: v} - case int64: - itr.c <- &FloatPoint{Name: p.name(), Tags: tags, Time: p.time(), Value: float64(v)} - default: - itr.c <- &FloatPoint{Name: p.name(), Tags: tags, Time: p.time(), Nil: true} - } + ok = itr.setBuf(p.name(), tags, p.time(), v) || ok case *integerChanIterator: - switch v := v.(type) { - case int64: - itr.c <- &IntegerPoint{Name: p.name(), Tags: tags, Time: p.time(), Value: v} - default: - itr.c <- &IntegerPoint{Name: p.name(), Tags: tags, Time: p.time(), Nil: true} - } + ok = itr.setBuf(p.name(), tags, p.time(), v) || ok case *stringChanIterator: - switch v := v.(type) { - case string: - itr.c <- &StringPoint{Name: p.name(), Tags: tags, Time: p.time(), Value: v} - default: - itr.c <- &StringPoint{Name: p.name(), Tags: tags, Time: p.time(), Nil: true} - } + ok = itr.setBuf(p.name(), tags, p.time(), v) || ok case *booleanChanIterator: - switch v := v.(type) { - case bool: - itr.c <- &BooleanPoint{Name: p.name(), Tags: tags, Time: p.time(), Value: v} - default: - itr.c <- &BooleanPoint{Name: p.name(), Tags: tags, Time: p.time(), Nil: true} - } + ok = itr.setBuf(p.name(), tags, p.time(), v) || ok default: panic(fmt.Sprintf("invalid aux itr type: %T", itr)) } } } + return ok } // drainIterator reads all points from an iterator. diff --git a/influxql/select.go b/influxql/select.go index 226bdd3c8f..0721d1eace 100644 --- a/influxql/select.go +++ b/influxql/select.go @@ -121,10 +121,9 @@ func buildAuxIterators(fields Fields, ic IteratorCreator, opt IteratorOptions) ( panic("unreachable") } } - aitr.Start() - // Drain primary aux iterator since there is no reader for it. - go drainIterator(aitr) + // Background the primary iterator since there is no reader for it. + aitr.Background() return itrs, nil }