Merge pull request #6384 from benbjohnson/chan-iterator-dbl-buffer

Add double buffer on ChanIterator
pull/6371/head
Ben Johnson 2016-04-14 13:53:08 -06:00
commit 42cbbcec2a
6 changed files with 213 additions and 38 deletions

View File

@ -726,7 +726,11 @@ func (itr *floatAuxIterator) stream() {
// floatChanIterator represents a new instance of floatChanIterator.
type floatChanIterator struct {
buf *FloatPoint
buf struct {
i int
filled bool
points [2]FloatPoint
}
cond *sync.Cond
done bool
}
@ -749,7 +753,7 @@ func (itr *floatChanIterator) setBuf(name string, tags Tags, time int64, value i
// Wait for either the iterator to be done (so we don't have to set the value)
// or for the buffer to have been read and ready for another write.
for !itr.done && itr.buf != nil {
for !itr.done && itr.buf.filled {
itr.cond.Wait()
}
@ -762,14 +766,16 @@ func (itr *floatChanIterator) setBuf(name string, tags Tags, time int64, value i
switch v := value.(type) {
case float64:
itr.buf = &FloatPoint{Name: name, Tags: tags, Time: time, Value: v}
itr.buf.points[itr.buf.i] = FloatPoint{Name: name, Tags: tags, Time: time, Value: v}
case int64:
itr.buf = &FloatPoint{Name: name, Tags: tags, Time: time, Value: float64(v)}
itr.buf.points[itr.buf.i] = FloatPoint{Name: name, Tags: tags, Time: time, Value: float64(v)}
default:
itr.buf = &FloatPoint{Name: name, Tags: tags, Time: time, Nil: true}
itr.buf.points[itr.buf.i] = FloatPoint{Name: name, Tags: tags, Time: time, Nil: true}
}
itr.buf.filled = true
// Signal to all waiting goroutines that a new value is ready to read.
itr.cond.Signal()
return true
@ -780,15 +786,22 @@ func (itr *floatChanIterator) Next() *FloatPoint {
// Wait until either a value is available in the buffer or
// the iterator is closed.
for !itr.done && itr.buf == nil {
for !itr.done && !itr.buf.filled {
itr.cond.Wait()
}
// Return nil once the channel is done and the buffer is empty.
if itr.done && !itr.buf.filled {
itr.cond.L.Unlock()
return nil
}
// Always read from the buffer if it exists, even if the iterator
// is closed. This prevents the last value from being truncated by
// the parent iterator.
p := itr.buf
itr.buf = nil
p := &itr.buf.points[itr.buf.i]
itr.buf.i = (itr.buf.i + 1) % len(itr.buf.points)
itr.buf.filled = false
itr.cond.Signal()
// Do not defer the unlock so we don't create an unnecessary allocation.
@ -2538,7 +2551,11 @@ func (itr *integerAuxIterator) stream() {
// integerChanIterator represents a new instance of integerChanIterator.
type integerChanIterator struct {
buf *IntegerPoint
buf struct {
i int
filled bool
points [2]IntegerPoint
}
cond *sync.Cond
done bool
}
@ -2561,7 +2578,7 @@ func (itr *integerChanIterator) setBuf(name string, tags Tags, time int64, value
// Wait for either the iterator to be done (so we don't have to set the value)
// or for the buffer to have been read and ready for another write.
for !itr.done && itr.buf != nil {
for !itr.done && itr.buf.filled {
itr.cond.Wait()
}
@ -2574,11 +2591,13 @@ func (itr *integerChanIterator) setBuf(name string, tags Tags, time int64, value
switch v := value.(type) {
case int64:
itr.buf = &IntegerPoint{Name: name, Tags: tags, Time: time, Value: v}
itr.buf.points[itr.buf.i] = IntegerPoint{Name: name, Tags: tags, Time: time, Value: v}
default:
itr.buf = &IntegerPoint{Name: name, Tags: tags, Time: time, Nil: true}
itr.buf.points[itr.buf.i] = IntegerPoint{Name: name, Tags: tags, Time: time, Nil: true}
}
itr.buf.filled = true
// Signal to all waiting goroutines that a new value is ready to read.
itr.cond.Signal()
return true
@ -2589,15 +2608,22 @@ func (itr *integerChanIterator) Next() *IntegerPoint {
// Wait until either a value is available in the buffer or
// the iterator is closed.
for !itr.done && itr.buf == nil {
for !itr.done && !itr.buf.filled {
itr.cond.Wait()
}
// Return nil once the channel is done and the buffer is empty.
if itr.done && !itr.buf.filled {
itr.cond.L.Unlock()
return nil
}
// Always read from the buffer if it exists, even if the iterator
// is closed. This prevents the last value from being truncated by
// the parent iterator.
p := itr.buf
itr.buf = nil
p := &itr.buf.points[itr.buf.i]
itr.buf.i = (itr.buf.i + 1) % len(itr.buf.points)
itr.buf.filled = false
itr.cond.Signal()
// Do not defer the unlock so we don't create an unnecessary allocation.
@ -4347,7 +4373,11 @@ func (itr *stringAuxIterator) stream() {
// stringChanIterator represents a new instance of stringChanIterator.
type stringChanIterator struct {
buf *StringPoint
buf struct {
i int
filled bool
points [2]StringPoint
}
cond *sync.Cond
done bool
}
@ -4370,7 +4400,7 @@ func (itr *stringChanIterator) setBuf(name string, tags Tags, time int64, value
// Wait for either the iterator to be done (so we don't have to set the value)
// or for the buffer to have been read and ready for another write.
for !itr.done && itr.buf != nil {
for !itr.done && itr.buf.filled {
itr.cond.Wait()
}
@ -4383,11 +4413,13 @@ func (itr *stringChanIterator) setBuf(name string, tags Tags, time int64, value
switch v := value.(type) {
case string:
itr.buf = &StringPoint{Name: name, Tags: tags, Time: time, Value: v}
itr.buf.points[itr.buf.i] = StringPoint{Name: name, Tags: tags, Time: time, Value: v}
default:
itr.buf = &StringPoint{Name: name, Tags: tags, Time: time, Nil: true}
itr.buf.points[itr.buf.i] = StringPoint{Name: name, Tags: tags, Time: time, Nil: true}
}
itr.buf.filled = true
// Signal to all waiting goroutines that a new value is ready to read.
itr.cond.Signal()
return true
@ -4398,15 +4430,22 @@ func (itr *stringChanIterator) Next() *StringPoint {
// Wait until either a value is available in the buffer or
// the iterator is closed.
for !itr.done && itr.buf == nil {
for !itr.done && !itr.buf.filled {
itr.cond.Wait()
}
// Return nil once the channel is done and the buffer is empty.
if itr.done && !itr.buf.filled {
itr.cond.L.Unlock()
return nil
}
// Always read from the buffer if it exists, even if the iterator
// is closed. This prevents the last value from being truncated by
// the parent iterator.
p := itr.buf
itr.buf = nil
p := &itr.buf.points[itr.buf.i]
itr.buf.i = (itr.buf.i + 1) % len(itr.buf.points)
itr.buf.filled = false
itr.cond.Signal()
// Do not defer the unlock so we don't create an unnecessary allocation.
@ -6156,7 +6195,11 @@ func (itr *booleanAuxIterator) stream() {
// booleanChanIterator represents a new instance of booleanChanIterator.
type booleanChanIterator struct {
buf *BooleanPoint
buf struct {
i int
filled bool
points [2]BooleanPoint
}
cond *sync.Cond
done bool
}
@ -6179,7 +6222,7 @@ func (itr *booleanChanIterator) setBuf(name string, tags Tags, time int64, value
// Wait for either the iterator to be done (so we don't have to set the value)
// or for the buffer to have been read and ready for another write.
for !itr.done && itr.buf != nil {
for !itr.done && itr.buf.filled {
itr.cond.Wait()
}
@ -6192,11 +6235,13 @@ func (itr *booleanChanIterator) setBuf(name string, tags Tags, time int64, value
switch v := value.(type) {
case bool:
itr.buf = &BooleanPoint{Name: name, Tags: tags, Time: time, Value: v}
itr.buf.points[itr.buf.i] = BooleanPoint{Name: name, Tags: tags, Time: time, Value: v}
default:
itr.buf = &BooleanPoint{Name: name, Tags: tags, Time: time, Nil: true}
itr.buf.points[itr.buf.i] = BooleanPoint{Name: name, Tags: tags, Time: time, Nil: true}
}
itr.buf.filled = true
// Signal to all waiting goroutines that a new value is ready to read.
itr.cond.Signal()
return true
@ -6207,15 +6252,22 @@ func (itr *booleanChanIterator) Next() *BooleanPoint {
// Wait until either a value is available in the buffer or
// the iterator is closed.
for !itr.done && itr.buf == nil {
for !itr.done && !itr.buf.filled {
itr.cond.Wait()
}
// Return nil once the channel is done and the buffer is empty.
if itr.done && !itr.buf.filled {
itr.cond.L.Unlock()
return nil
}
// Always read from the buffer if it exists, even if the iterator
// is closed. This prevents the last value from being truncated by
// the parent iterator.
p := itr.buf
itr.buf = nil
p := &itr.buf.points[itr.buf.i]
itr.buf.i = (itr.buf.i + 1) % len(itr.buf.points)
itr.buf.filled = false
itr.cond.Signal()
// Do not defer the unlock so we don't create an unnecessary allocation.

View File

@ -725,7 +725,11 @@ func (itr *{{.name}}AuxIterator) stream() {
// {{$k.name}}ChanIterator represents a new instance of {{$k.name}}ChanIterator.
type {{$k.name}}ChanIterator struct {
buf *{{$k.Name}}Point
buf struct {
i int
filled bool
points [2]{{$k.Name}}Point
}
cond *sync.Cond
done bool
}
@ -748,7 +752,7 @@ func (itr *{{$k.name}}ChanIterator) setBuf(name string, tags Tags, time int64, v
// Wait for either the iterator to be done (so we don't have to set the value)
// or for the buffer to have been read and ready for another write.
for !itr.done && itr.buf != nil {
for !itr.done && itr.buf.filled {
itr.cond.Wait()
}
@ -761,14 +765,16 @@ func (itr *{{$k.name}}ChanIterator) setBuf(name string, tags Tags, time int64, v
switch v := value.(type) {
case {{$k.Type}}:
itr.buf = &{{$k.Name}}Point{Name: name, Tags: tags, Time: time, Value: v}
itr.buf.points[itr.buf.i] = {{$k.Name}}Point{Name: name, Tags: tags, Time: time, Value: v}
{{if eq $k.Name "Float"}}
case int64:
itr.buf = &{{$k.Name}}Point{Name: name, Tags: tags, Time: time, Value: float64(v)}
itr.buf.points[itr.buf.i] = {{$k.Name}}Point{Name: name, Tags: tags, Time: time, Value: float64(v)}
{{end}}
default:
itr.buf = &{{$k.Name}}Point{Name: name, Tags: tags, Time: time, Nil: true}
itr.buf.points[itr.buf.i] = {{$k.Name}}Point{Name: name, Tags: tags, Time: time, Nil: true}
}
itr.buf.filled = true
// Signal to all waiting goroutines that a new value is ready to read.
itr.cond.Signal()
return true
@ -779,15 +785,22 @@ func (itr *{{$k.name}}ChanIterator) Next() *{{$k.Name}}Point {
// Wait until either a value is available in the buffer or
// the iterator is closed.
for !itr.done && itr.buf == nil {
for !itr.done && !itr.buf.filled {
itr.cond.Wait()
}
// Return nil once the channel is done and the buffer is empty.
if itr.done && !itr.buf.filled {
itr.cond.L.Unlock()
return nil
}
// Always read from the buffer if it exists, even if the iterator
// is closed. This prevents the last value from being truncated by
// the parent iterator.
p := itr.buf
itr.buf = nil
p := &itr.buf.points[itr.buf.i]
itr.buf.i = (itr.buf.i + 1) % len(itr.buf.points)
itr.buf.filled = false
itr.cond.Signal()
// Do not defer the unlock so we don't create an unnecessary allocation.

View File

@ -415,6 +415,41 @@ func DrainIterator(itr Iterator) {
}
}
// DrainIterators reads all points from all iterators.
func DrainIterators(itrs []Iterator) {
for {
var hasData bool
for _, itr := range itrs {
switch itr := itr.(type) {
case FloatIterator:
if p := itr.Next(); p != nil {
hasData = true
}
case IntegerIterator:
if p := itr.Next(); p != nil {
hasData = true
}
case StringIterator:
if p := itr.Next(); p != nil {
hasData = true
}
case BooleanIterator:
if p := itr.Next(); p != nil {
hasData = true
}
default:
panic(fmt.Sprintf("unsupported iterator type for draining: %T", itr))
}
}
// Exit once all iterators return a nil point.
if !hasData {
break
}
}
}
// NewReaderIterator returns an iterator that streams from a reader.
func NewReaderIterator(r io.Reader, typ DataType, stats IteratorStats) (Iterator, error) {
switch typ {

View File

@ -720,7 +720,7 @@ func (itrs Iterators) ReadAll() [][]influxql.Point {
if points == nil {
break
}
a = append(a, points)
a = append(a, influxql.Points(points).Clone())
}
// Close all iterators.

View File

@ -3,6 +3,7 @@ package influxql
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"sort"
@ -32,6 +33,31 @@ type Point interface {
// Points represents a list of points.
type Points []Point
// Clone returns a deep copy of a.
func (a Points) Clone() []Point {
other := make([]Point, len(a))
for i, p := range a {
if p == nil {
other[i] = nil
continue
}
switch p := p.(type) {
case *FloatPoint:
other[i] = p.Clone()
case *IntegerPoint:
other[i] = p.Clone()
case *StringPoint:
other[i] = p.Clone()
case *BooleanPoint:
other[i] = p.Clone()
default:
panic(fmt.Sprintf("unable to clone point: %T", p))
}
}
return other
}
// Tags represent a map of keys and values.
// It memoizes its key so it can be used efficiently during query execution.
type Tags struct {

View File

@ -2035,3 +2035,52 @@ func TestSelect_InvalidQueries(t *testing.T) {
influxql.Iterators(itrs).Close()
}
}
func BenchmarkSelect_Raw_1K(b *testing.B) { benchmarkSelectRaw(b, 1000) }
func BenchmarkSelect_Raw_100K(b *testing.B) { benchmarkSelectRaw(b, 1000000) }
func benchmarkSelectRaw(b *testing.B, pointN int) {
benchmarkSelect(b, MustParseSelectStatement(`SELECT fval FROM cpu`), NewRawBenchmarkIteratorCreator(pointN))
}
func benchmarkSelect(b *testing.B, stmt *influxql.SelectStatement, ic influxql.IteratorCreator) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
itrs, err := influxql.Select(stmt, ic, nil)
if err != nil {
b.Fatal(err)
}
influxql.DrainIterators(itrs)
}
}
// NewRawBenchmarkIteratorCreator returns a new mock iterator creator with generated fields.
func NewRawBenchmarkIteratorCreator(pointN int) *IteratorCreator {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
if opt.Expr != nil {
panic("unexpected expression")
}
p := influxql.FloatPoint{
Name: "cpu",
Aux: make([]interface{}, len(opt.Aux)),
}
for i := range opt.Aux {
switch opt.Aux[i] {
case "fval":
p.Aux[i] = float64(100)
default:
panic("unknown iterator expr: " + opt.Expr.String())
}
}
return &FloatPointGenerator{N: pointN, Fn: func(i int) *influxql.FloatPoint {
p.Time = int64(time.Duration(i) * (10 * time.Second))
return &p
}}, nil
}
return &ic
}