diff --git a/influxql/iterator.gen.go b/influxql/iterator.gen.go index 2e45d17ab4..ffd7d4b2e8 100644 --- a/influxql/iterator.gen.go +++ b/influxql/iterator.gen.go @@ -726,7 +726,11 @@ func (itr *floatAuxIterator) stream() { // floatChanIterator represents a new instance of floatChanIterator. type floatChanIterator struct { - buf *FloatPoint + buf struct { + i int + filled bool + points [2]FloatPoint + } cond *sync.Cond done bool } @@ -749,7 +753,7 @@ func (itr *floatChanIterator) setBuf(name string, tags Tags, time int64, value i // Wait for either the iterator to be done (so we don't have to set the value) // or for the buffer to have been read and ready for another write. - for !itr.done && itr.buf != nil { + for !itr.done && itr.buf.filled { itr.cond.Wait() } @@ -762,14 +766,16 @@ func (itr *floatChanIterator) setBuf(name string, tags Tags, time int64, value i switch v := value.(type) { case float64: - itr.buf = &FloatPoint{Name: name, Tags: tags, Time: time, Value: v} + itr.buf.points[itr.buf.i] = FloatPoint{Name: name, Tags: tags, Time: time, Value: v} case int64: - itr.buf = &FloatPoint{Name: name, Tags: tags, Time: time, Value: float64(v)} + itr.buf.points[itr.buf.i] = FloatPoint{Name: name, Tags: tags, Time: time, Value: float64(v)} default: - itr.buf = &FloatPoint{Name: name, Tags: tags, Time: time, Nil: true} + itr.buf.points[itr.buf.i] = FloatPoint{Name: name, Tags: tags, Time: time, Nil: true} } + itr.buf.filled = true + // Signal to all waiting goroutines that a new value is ready to read. itr.cond.Signal() return true @@ -780,15 +786,22 @@ func (itr *floatChanIterator) Next() *FloatPoint { // Wait until either a value is available in the buffer or // the iterator is closed. - for !itr.done && itr.buf == nil { + for !itr.done && !itr.buf.filled { itr.cond.Wait() } + // Return nil once the channel is done and the buffer is empty. + if itr.done && !itr.buf.filled { + itr.cond.L.Unlock() + return nil + } + // Always read from the buffer if it exists, even if the iterator // is closed. This prevents the last value from being truncated by // the parent iterator. - p := itr.buf - itr.buf = nil + p := &itr.buf.points[itr.buf.i] + itr.buf.i = (itr.buf.i + 1) % len(itr.buf.points) + itr.buf.filled = false itr.cond.Signal() // Do not defer the unlock so we don't create an unnecessary allocation. @@ -2538,7 +2551,11 @@ func (itr *integerAuxIterator) stream() { // integerChanIterator represents a new instance of integerChanIterator. type integerChanIterator struct { - buf *IntegerPoint + buf struct { + i int + filled bool + points [2]IntegerPoint + } cond *sync.Cond done bool } @@ -2561,7 +2578,7 @@ func (itr *integerChanIterator) setBuf(name string, tags Tags, time int64, value // Wait for either the iterator to be done (so we don't have to set the value) // or for the buffer to have been read and ready for another write. - for !itr.done && itr.buf != nil { + for !itr.done && itr.buf.filled { itr.cond.Wait() } @@ -2574,11 +2591,13 @@ func (itr *integerChanIterator) setBuf(name string, tags Tags, time int64, value switch v := value.(type) { case int64: - itr.buf = &IntegerPoint{Name: name, Tags: tags, Time: time, Value: v} + itr.buf.points[itr.buf.i] = IntegerPoint{Name: name, Tags: tags, Time: time, Value: v} default: - itr.buf = &IntegerPoint{Name: name, Tags: tags, Time: time, Nil: true} + itr.buf.points[itr.buf.i] = IntegerPoint{Name: name, Tags: tags, Time: time, Nil: true} } + itr.buf.filled = true + // Signal to all waiting goroutines that a new value is ready to read. itr.cond.Signal() return true @@ -2589,15 +2608,22 @@ func (itr *integerChanIterator) Next() *IntegerPoint { // Wait until either a value is available in the buffer or // the iterator is closed. - for !itr.done && itr.buf == nil { + for !itr.done && !itr.buf.filled { itr.cond.Wait() } + // Return nil once the channel is done and the buffer is empty. + if itr.done && !itr.buf.filled { + itr.cond.L.Unlock() + return nil + } + // Always read from the buffer if it exists, even if the iterator // is closed. This prevents the last value from being truncated by // the parent iterator. - p := itr.buf - itr.buf = nil + p := &itr.buf.points[itr.buf.i] + itr.buf.i = (itr.buf.i + 1) % len(itr.buf.points) + itr.buf.filled = false itr.cond.Signal() // Do not defer the unlock so we don't create an unnecessary allocation. @@ -4347,7 +4373,11 @@ func (itr *stringAuxIterator) stream() { // stringChanIterator represents a new instance of stringChanIterator. type stringChanIterator struct { - buf *StringPoint + buf struct { + i int + filled bool + points [2]StringPoint + } cond *sync.Cond done bool } @@ -4370,7 +4400,7 @@ func (itr *stringChanIterator) setBuf(name string, tags Tags, time int64, value // Wait for either the iterator to be done (so we don't have to set the value) // or for the buffer to have been read and ready for another write. - for !itr.done && itr.buf != nil { + for !itr.done && itr.buf.filled { itr.cond.Wait() } @@ -4383,11 +4413,13 @@ func (itr *stringChanIterator) setBuf(name string, tags Tags, time int64, value switch v := value.(type) { case string: - itr.buf = &StringPoint{Name: name, Tags: tags, Time: time, Value: v} + itr.buf.points[itr.buf.i] = StringPoint{Name: name, Tags: tags, Time: time, Value: v} default: - itr.buf = &StringPoint{Name: name, Tags: tags, Time: time, Nil: true} + itr.buf.points[itr.buf.i] = StringPoint{Name: name, Tags: tags, Time: time, Nil: true} } + itr.buf.filled = true + // Signal to all waiting goroutines that a new value is ready to read. itr.cond.Signal() return true @@ -4398,15 +4430,22 @@ func (itr *stringChanIterator) Next() *StringPoint { // Wait until either a value is available in the buffer or // the iterator is closed. - for !itr.done && itr.buf == nil { + for !itr.done && !itr.buf.filled { itr.cond.Wait() } + // Return nil once the channel is done and the buffer is empty. + if itr.done && !itr.buf.filled { + itr.cond.L.Unlock() + return nil + } + // Always read from the buffer if it exists, even if the iterator // is closed. This prevents the last value from being truncated by // the parent iterator. - p := itr.buf - itr.buf = nil + p := &itr.buf.points[itr.buf.i] + itr.buf.i = (itr.buf.i + 1) % len(itr.buf.points) + itr.buf.filled = false itr.cond.Signal() // Do not defer the unlock so we don't create an unnecessary allocation. @@ -6156,7 +6195,11 @@ func (itr *booleanAuxIterator) stream() { // booleanChanIterator represents a new instance of booleanChanIterator. type booleanChanIterator struct { - buf *BooleanPoint + buf struct { + i int + filled bool + points [2]BooleanPoint + } cond *sync.Cond done bool } @@ -6179,7 +6222,7 @@ func (itr *booleanChanIterator) setBuf(name string, tags Tags, time int64, value // Wait for either the iterator to be done (so we don't have to set the value) // or for the buffer to have been read and ready for another write. - for !itr.done && itr.buf != nil { + for !itr.done && itr.buf.filled { itr.cond.Wait() } @@ -6192,11 +6235,13 @@ func (itr *booleanChanIterator) setBuf(name string, tags Tags, time int64, value switch v := value.(type) { case bool: - itr.buf = &BooleanPoint{Name: name, Tags: tags, Time: time, Value: v} + itr.buf.points[itr.buf.i] = BooleanPoint{Name: name, Tags: tags, Time: time, Value: v} default: - itr.buf = &BooleanPoint{Name: name, Tags: tags, Time: time, Nil: true} + itr.buf.points[itr.buf.i] = BooleanPoint{Name: name, Tags: tags, Time: time, Nil: true} } + itr.buf.filled = true + // Signal to all waiting goroutines that a new value is ready to read. itr.cond.Signal() return true @@ -6207,15 +6252,22 @@ func (itr *booleanChanIterator) Next() *BooleanPoint { // Wait until either a value is available in the buffer or // the iterator is closed. - for !itr.done && itr.buf == nil { + for !itr.done && !itr.buf.filled { itr.cond.Wait() } + // Return nil once the channel is done and the buffer is empty. + if itr.done && !itr.buf.filled { + itr.cond.L.Unlock() + return nil + } + // Always read from the buffer if it exists, even if the iterator // is closed. This prevents the last value from being truncated by // the parent iterator. - p := itr.buf - itr.buf = nil + p := &itr.buf.points[itr.buf.i] + itr.buf.i = (itr.buf.i + 1) % len(itr.buf.points) + itr.buf.filled = false itr.cond.Signal() // Do not defer the unlock so we don't create an unnecessary allocation. diff --git a/influxql/iterator.gen.go.tmpl b/influxql/iterator.gen.go.tmpl index 397ed9e602..53085dd460 100644 --- a/influxql/iterator.gen.go.tmpl +++ b/influxql/iterator.gen.go.tmpl @@ -725,7 +725,11 @@ func (itr *{{.name}}AuxIterator) stream() { // {{$k.name}}ChanIterator represents a new instance of {{$k.name}}ChanIterator. type {{$k.name}}ChanIterator struct { - buf *{{$k.Name}}Point + buf struct { + i int + filled bool + points [2]{{$k.Name}}Point + } cond *sync.Cond done bool } @@ -748,7 +752,7 @@ func (itr *{{$k.name}}ChanIterator) setBuf(name string, tags Tags, time int64, v // Wait for either the iterator to be done (so we don't have to set the value) // or for the buffer to have been read and ready for another write. - for !itr.done && itr.buf != nil { + for !itr.done && itr.buf.filled { itr.cond.Wait() } @@ -761,14 +765,16 @@ func (itr *{{$k.name}}ChanIterator) setBuf(name string, tags Tags, time int64, v switch v := value.(type) { case {{$k.Type}}: - itr.buf = &{{$k.Name}}Point{Name: name, Tags: tags, Time: time, Value: v} + itr.buf.points[itr.buf.i] = {{$k.Name}}Point{Name: name, Tags: tags, Time: time, Value: v} {{if eq $k.Name "Float"}} case int64: - itr.buf = &{{$k.Name}}Point{Name: name, Tags: tags, Time: time, Value: float64(v)} + itr.buf.points[itr.buf.i] = {{$k.Name}}Point{Name: name, Tags: tags, Time: time, Value: float64(v)} {{end}} default: - itr.buf = &{{$k.Name}}Point{Name: name, Tags: tags, Time: time, Nil: true} + itr.buf.points[itr.buf.i] = {{$k.Name}}Point{Name: name, Tags: tags, Time: time, Nil: true} } + itr.buf.filled = true + // Signal to all waiting goroutines that a new value is ready to read. itr.cond.Signal() return true @@ -779,15 +785,22 @@ func (itr *{{$k.name}}ChanIterator) Next() *{{$k.Name}}Point { // Wait until either a value is available in the buffer or // the iterator is closed. - for !itr.done && itr.buf == nil { + for !itr.done && !itr.buf.filled { itr.cond.Wait() } + // Return nil once the channel is done and the buffer is empty. + if itr.done && !itr.buf.filled { + itr.cond.L.Unlock() + return nil + } + // Always read from the buffer if it exists, even if the iterator // is closed. This prevents the last value from being truncated by // the parent iterator. - p := itr.buf - itr.buf = nil + p := &itr.buf.points[itr.buf.i] + itr.buf.i = (itr.buf.i + 1) % len(itr.buf.points) + itr.buf.filled = false itr.cond.Signal() // Do not defer the unlock so we don't create an unnecessary allocation. diff --git a/influxql/iterator.go b/influxql/iterator.go index 3df58421dd..890a42d1f5 100644 --- a/influxql/iterator.go +++ b/influxql/iterator.go @@ -415,6 +415,41 @@ func DrainIterator(itr Iterator) { } } +// DrainIterators reads all points from all iterators. +func DrainIterators(itrs []Iterator) { + for { + var hasData bool + + for _, itr := range itrs { + switch itr := itr.(type) { + case FloatIterator: + if p := itr.Next(); p != nil { + hasData = true + } + case IntegerIterator: + if p := itr.Next(); p != nil { + hasData = true + } + case StringIterator: + if p := itr.Next(); p != nil { + hasData = true + } + case BooleanIterator: + if p := itr.Next(); p != nil { + hasData = true + } + default: + panic(fmt.Sprintf("unsupported iterator type for draining: %T", itr)) + } + } + + // Exit once all iterators return a nil point. + if !hasData { + break + } + } +} + // NewReaderIterator returns an iterator that streams from a reader. func NewReaderIterator(r io.Reader, typ DataType, stats IteratorStats) (Iterator, error) { switch typ { diff --git a/influxql/iterator_test.go b/influxql/iterator_test.go index 75163b43c3..8689a271f2 100644 --- a/influxql/iterator_test.go +++ b/influxql/iterator_test.go @@ -720,7 +720,7 @@ func (itrs Iterators) ReadAll() [][]influxql.Point { if points == nil { break } - a = append(a, points) + a = append(a, influxql.Points(points).Clone()) } // Close all iterators. diff --git a/influxql/point.go b/influxql/point.go index e1c88a9733..613d2d6d7d 100644 --- a/influxql/point.go +++ b/influxql/point.go @@ -3,6 +3,7 @@ package influxql import ( "bytes" "encoding/binary" + "fmt" "io" "sort" @@ -32,6 +33,31 @@ type Point interface { // Points represents a list of points. type Points []Point +// Clone returns a deep copy of a. +func (a Points) Clone() []Point { + other := make([]Point, len(a)) + for i, p := range a { + if p == nil { + other[i] = nil + continue + } + + switch p := p.(type) { + case *FloatPoint: + other[i] = p.Clone() + case *IntegerPoint: + other[i] = p.Clone() + case *StringPoint: + other[i] = p.Clone() + case *BooleanPoint: + other[i] = p.Clone() + default: + panic(fmt.Sprintf("unable to clone point: %T", p)) + } + } + return other +} + // Tags represent a map of keys and values. // It memoizes its key so it can be used efficiently during query execution. type Tags struct { diff --git a/influxql/select_test.go b/influxql/select_test.go index f8f21cc3ea..3c752e8d4e 100644 --- a/influxql/select_test.go +++ b/influxql/select_test.go @@ -2035,3 +2035,52 @@ func TestSelect_InvalidQueries(t *testing.T) { influxql.Iterators(itrs).Close() } } + +func BenchmarkSelect_Raw_1K(b *testing.B) { benchmarkSelectRaw(b, 1000) } +func BenchmarkSelect_Raw_100K(b *testing.B) { benchmarkSelectRaw(b, 1000000) } + +func benchmarkSelectRaw(b *testing.B, pointN int) { + benchmarkSelect(b, MustParseSelectStatement(`SELECT fval FROM cpu`), NewRawBenchmarkIteratorCreator(pointN)) +} + +func benchmarkSelect(b *testing.B, stmt *influxql.SelectStatement, ic influxql.IteratorCreator) { + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + itrs, err := influxql.Select(stmt, ic, nil) + if err != nil { + b.Fatal(err) + } + influxql.DrainIterators(itrs) + } +} + +// NewRawBenchmarkIteratorCreator returns a new mock iterator creator with generated fields. +func NewRawBenchmarkIteratorCreator(pointN int) *IteratorCreator { + var ic IteratorCreator + ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { + if opt.Expr != nil { + panic("unexpected expression") + } + + p := influxql.FloatPoint{ + Name: "cpu", + Aux: make([]interface{}, len(opt.Aux)), + } + + for i := range opt.Aux { + switch opt.Aux[i] { + case "fval": + p.Aux[i] = float64(100) + default: + panic("unknown iterator expr: " + opt.Expr.String()) + } + } + + return &FloatPointGenerator{N: pointN, Fn: func(i int) *influxql.FloatPoint { + p.Time = int64(time.Duration(i) * (10 * time.Second)) + return &p + }}, nil + } + return &ic +}