Merge pull request #17596 from foobar/optimize-sorted-merge-iterator
improvement(query): performance improvement for sorted merge iteratorpull/18689/head
commit
78a05d1119
|
@ -11,6 +11,7 @@ import (
|
|||
"context"
|
||||
"io"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -374,6 +375,7 @@ func (itr *floatSortedMergeIterator) pop() (*FloatPoint, error) {
|
|||
}
|
||||
itr.heap.items = append(itr.heap.items, item)
|
||||
}
|
||||
itr.heap.detectFast()
|
||||
heap.Init(itr.heap)
|
||||
itr.init = true
|
||||
}
|
||||
|
@ -411,11 +413,57 @@ func (itr *floatSortedMergeIterator) pop() (*FloatPoint, error) {
|
|||
type floatSortedMergeHeap struct {
|
||||
opt IteratorOptions
|
||||
items []*floatSortedMergeHeapItem
|
||||
// if each input comes from a unique single time series, we can make a shortcut.
|
||||
// detection of the shortcut introduces some overhead but it gets significant
|
||||
// performance improvement in cases like SELECT * FROM m GROUP BY *
|
||||
fast bool
|
||||
}
|
||||
|
||||
func (h *floatSortedMergeHeap) detectFast() {
|
||||
for _, item := range h.items {
|
||||
if item.itr.Stats().SeriesN != 1 {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
hasDup := false
|
||||
s := make([]*floatSortedMergeHeapItem, len(h.items))
|
||||
copy(s, h.items)
|
||||
|
||||
less := func(i, j int) bool {
|
||||
x, y := s[i].point, s[j].point
|
||||
ret := strings.Compare(x.Name, y.Name)
|
||||
if ret == 0 {
|
||||
ret = strings.Compare(x.Tags.ID(), y.Tags.ID())
|
||||
}
|
||||
if ret != 0 {
|
||||
// TT
|
||||
// ret | == -1 | h.opt.Ascending | result
|
||||
// 1 | false | false | true
|
||||
// -1 | true | false | false
|
||||
// 1 | false | true | false
|
||||
// -1 | true | true | true
|
||||
return ret == -1 == h.opt.Ascending
|
||||
}
|
||||
hasDup = true
|
||||
return false
|
||||
}
|
||||
sort.Slice(s, less)
|
||||
if !hasDup {
|
||||
h.fast = true
|
||||
for i, item := range s {
|
||||
item.fastIdx = i
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *floatSortedMergeHeap) Len() int { return len(h.items) }
|
||||
func (h *floatSortedMergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] }
|
||||
func (h *floatSortedMergeHeap) Less(i, j int) bool {
|
||||
if h.fast {
|
||||
return h.items[i].fastIdx < h.items[j].fastIdx
|
||||
}
|
||||
|
||||
x, y := h.items[i].point, h.items[j].point
|
||||
|
||||
if h.opt.Ascending {
|
||||
|
@ -489,6 +537,8 @@ type floatSortedMergeHeapItem struct {
|
|||
point *FloatPoint
|
||||
err error
|
||||
itr FloatIterator
|
||||
// index for fast shortcut
|
||||
fastIdx int
|
||||
}
|
||||
|
||||
// floatIteratorScanner scans the results of a FloatIterator into a map.
|
||||
|
@ -3038,6 +3088,7 @@ func (itr *integerSortedMergeIterator) pop() (*IntegerPoint, error) {
|
|||
}
|
||||
itr.heap.items = append(itr.heap.items, item)
|
||||
}
|
||||
itr.heap.detectFast()
|
||||
heap.Init(itr.heap)
|
||||
itr.init = true
|
||||
}
|
||||
|
@ -3075,11 +3126,57 @@ func (itr *integerSortedMergeIterator) pop() (*IntegerPoint, error) {
|
|||
type integerSortedMergeHeap struct {
|
||||
opt IteratorOptions
|
||||
items []*integerSortedMergeHeapItem
|
||||
// if each input comes from a unique single time series, we can make a shortcut.
|
||||
// detection of the shortcut introduces some overhead but it gets significant
|
||||
// performance improvement in cases like SELECT * FROM m GROUP BY *
|
||||
fast bool
|
||||
}
|
||||
|
||||
func (h *integerSortedMergeHeap) detectFast() {
|
||||
for _, item := range h.items {
|
||||
if item.itr.Stats().SeriesN != 1 {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
hasDup := false
|
||||
s := make([]*integerSortedMergeHeapItem, len(h.items))
|
||||
copy(s, h.items)
|
||||
|
||||
less := func(i, j int) bool {
|
||||
x, y := s[i].point, s[j].point
|
||||
ret := strings.Compare(x.Name, y.Name)
|
||||
if ret == 0 {
|
||||
ret = strings.Compare(x.Tags.ID(), y.Tags.ID())
|
||||
}
|
||||
if ret != 0 {
|
||||
// TT
|
||||
// ret | == -1 | h.opt.Ascending | result
|
||||
// 1 | false | false | true
|
||||
// -1 | true | false | false
|
||||
// 1 | false | true | false
|
||||
// -1 | true | true | true
|
||||
return ret == -1 == h.opt.Ascending
|
||||
}
|
||||
hasDup = true
|
||||
return false
|
||||
}
|
||||
sort.Slice(s, less)
|
||||
if !hasDup {
|
||||
h.fast = true
|
||||
for i, item := range s {
|
||||
item.fastIdx = i
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *integerSortedMergeHeap) Len() int { return len(h.items) }
|
||||
func (h *integerSortedMergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] }
|
||||
func (h *integerSortedMergeHeap) Less(i, j int) bool {
|
||||
if h.fast {
|
||||
return h.items[i].fastIdx < h.items[j].fastIdx
|
||||
}
|
||||
|
||||
x, y := h.items[i].point, h.items[j].point
|
||||
|
||||
if h.opt.Ascending {
|
||||
|
@ -3153,6 +3250,8 @@ type integerSortedMergeHeapItem struct {
|
|||
point *IntegerPoint
|
||||
err error
|
||||
itr IntegerIterator
|
||||
// index for fast shortcut
|
||||
fastIdx int
|
||||
}
|
||||
|
||||
// integerIteratorScanner scans the results of a IntegerIterator into a map.
|
||||
|
@ -5702,6 +5801,7 @@ func (itr *unsignedSortedMergeIterator) pop() (*UnsignedPoint, error) {
|
|||
}
|
||||
itr.heap.items = append(itr.heap.items, item)
|
||||
}
|
||||
itr.heap.detectFast()
|
||||
heap.Init(itr.heap)
|
||||
itr.init = true
|
||||
}
|
||||
|
@ -5739,11 +5839,57 @@ func (itr *unsignedSortedMergeIterator) pop() (*UnsignedPoint, error) {
|
|||
type unsignedSortedMergeHeap struct {
|
||||
opt IteratorOptions
|
||||
items []*unsignedSortedMergeHeapItem
|
||||
// if each input comes from a unique single time series, we can make a shortcut.
|
||||
// detection of the shortcut introduces some overhead but it gets significant
|
||||
// performance improvement in cases like SELECT * FROM m GROUP BY *
|
||||
fast bool
|
||||
}
|
||||
|
||||
func (h *unsignedSortedMergeHeap) detectFast() {
|
||||
for _, item := range h.items {
|
||||
if item.itr.Stats().SeriesN != 1 {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
hasDup := false
|
||||
s := make([]*unsignedSortedMergeHeapItem, len(h.items))
|
||||
copy(s, h.items)
|
||||
|
||||
less := func(i, j int) bool {
|
||||
x, y := s[i].point, s[j].point
|
||||
ret := strings.Compare(x.Name, y.Name)
|
||||
if ret == 0 {
|
||||
ret = strings.Compare(x.Tags.ID(), y.Tags.ID())
|
||||
}
|
||||
if ret != 0 {
|
||||
// TT
|
||||
// ret | == -1 | h.opt.Ascending | result
|
||||
// 1 | false | false | true
|
||||
// -1 | true | false | false
|
||||
// 1 | false | true | false
|
||||
// -1 | true | true | true
|
||||
return ret == -1 == h.opt.Ascending
|
||||
}
|
||||
hasDup = true
|
||||
return false
|
||||
}
|
||||
sort.Slice(s, less)
|
||||
if !hasDup {
|
||||
h.fast = true
|
||||
for i, item := range s {
|
||||
item.fastIdx = i
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *unsignedSortedMergeHeap) Len() int { return len(h.items) }
|
||||
func (h *unsignedSortedMergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] }
|
||||
func (h *unsignedSortedMergeHeap) Less(i, j int) bool {
|
||||
if h.fast {
|
||||
return h.items[i].fastIdx < h.items[j].fastIdx
|
||||
}
|
||||
|
||||
x, y := h.items[i].point, h.items[j].point
|
||||
|
||||
if h.opt.Ascending {
|
||||
|
@ -5817,6 +5963,8 @@ type unsignedSortedMergeHeapItem struct {
|
|||
point *UnsignedPoint
|
||||
err error
|
||||
itr UnsignedIterator
|
||||
// index for fast shortcut
|
||||
fastIdx int
|
||||
}
|
||||
|
||||
// unsignedIteratorScanner scans the results of a UnsignedIterator into a map.
|
||||
|
@ -8366,6 +8514,7 @@ func (itr *stringSortedMergeIterator) pop() (*StringPoint, error) {
|
|||
}
|
||||
itr.heap.items = append(itr.heap.items, item)
|
||||
}
|
||||
itr.heap.detectFast()
|
||||
heap.Init(itr.heap)
|
||||
itr.init = true
|
||||
}
|
||||
|
@ -8403,11 +8552,57 @@ func (itr *stringSortedMergeIterator) pop() (*StringPoint, error) {
|
|||
type stringSortedMergeHeap struct {
|
||||
opt IteratorOptions
|
||||
items []*stringSortedMergeHeapItem
|
||||
// if each input comes from a unique single time series, we can make a shortcut.
|
||||
// detection of the shortcut introduces some overhead but it gets significant
|
||||
// performance improvement in cases like SELECT * FROM m GROUP BY *
|
||||
fast bool
|
||||
}
|
||||
|
||||
func (h *stringSortedMergeHeap) detectFast() {
|
||||
for _, item := range h.items {
|
||||
if item.itr.Stats().SeriesN != 1 {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
hasDup := false
|
||||
s := make([]*stringSortedMergeHeapItem, len(h.items))
|
||||
copy(s, h.items)
|
||||
|
||||
less := func(i, j int) bool {
|
||||
x, y := s[i].point, s[j].point
|
||||
ret := strings.Compare(x.Name, y.Name)
|
||||
if ret == 0 {
|
||||
ret = strings.Compare(x.Tags.ID(), y.Tags.ID())
|
||||
}
|
||||
if ret != 0 {
|
||||
// TT
|
||||
// ret | == -1 | h.opt.Ascending | result
|
||||
// 1 | false | false | true
|
||||
// -1 | true | false | false
|
||||
// 1 | false | true | false
|
||||
// -1 | true | true | true
|
||||
return ret == -1 == h.opt.Ascending
|
||||
}
|
||||
hasDup = true
|
||||
return false
|
||||
}
|
||||
sort.Slice(s, less)
|
||||
if !hasDup {
|
||||
h.fast = true
|
||||
for i, item := range s {
|
||||
item.fastIdx = i
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *stringSortedMergeHeap) Len() int { return len(h.items) }
|
||||
func (h *stringSortedMergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] }
|
||||
func (h *stringSortedMergeHeap) Less(i, j int) bool {
|
||||
if h.fast {
|
||||
return h.items[i].fastIdx < h.items[j].fastIdx
|
||||
}
|
||||
|
||||
x, y := h.items[i].point, h.items[j].point
|
||||
|
||||
if h.opt.Ascending {
|
||||
|
@ -8481,6 +8676,8 @@ type stringSortedMergeHeapItem struct {
|
|||
point *StringPoint
|
||||
err error
|
||||
itr StringIterator
|
||||
// index for fast shortcut
|
||||
fastIdx int
|
||||
}
|
||||
|
||||
// stringIteratorScanner scans the results of a StringIterator into a map.
|
||||
|
@ -11016,6 +11213,7 @@ func (itr *booleanSortedMergeIterator) pop() (*BooleanPoint, error) {
|
|||
}
|
||||
itr.heap.items = append(itr.heap.items, item)
|
||||
}
|
||||
itr.heap.detectFast()
|
||||
heap.Init(itr.heap)
|
||||
itr.init = true
|
||||
}
|
||||
|
@ -11053,11 +11251,57 @@ func (itr *booleanSortedMergeIterator) pop() (*BooleanPoint, error) {
|
|||
type booleanSortedMergeHeap struct {
|
||||
opt IteratorOptions
|
||||
items []*booleanSortedMergeHeapItem
|
||||
// if each input comes from a unique single time series, we can make a shortcut.
|
||||
// detection of the shortcut introduces some overhead but it gets significant
|
||||
// performance improvement in cases like SELECT * FROM m GROUP BY *
|
||||
fast bool
|
||||
}
|
||||
|
||||
func (h *booleanSortedMergeHeap) detectFast() {
|
||||
for _, item := range h.items {
|
||||
if item.itr.Stats().SeriesN != 1 {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
hasDup := false
|
||||
s := make([]*booleanSortedMergeHeapItem, len(h.items))
|
||||
copy(s, h.items)
|
||||
|
||||
less := func(i, j int) bool {
|
||||
x, y := s[i].point, s[j].point
|
||||
ret := strings.Compare(x.Name, y.Name)
|
||||
if ret == 0 {
|
||||
ret = strings.Compare(x.Tags.ID(), y.Tags.ID())
|
||||
}
|
||||
if ret != 0 {
|
||||
// TT
|
||||
// ret | == -1 | h.opt.Ascending | result
|
||||
// 1 | false | false | true
|
||||
// -1 | true | false | false
|
||||
// 1 | false | true | false
|
||||
// -1 | true | true | true
|
||||
return ret == -1 == h.opt.Ascending
|
||||
}
|
||||
hasDup = true
|
||||
return false
|
||||
}
|
||||
sort.Slice(s, less)
|
||||
if !hasDup {
|
||||
h.fast = true
|
||||
for i, item := range s {
|
||||
item.fastIdx = i
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *booleanSortedMergeHeap) Len() int { return len(h.items) }
|
||||
func (h *booleanSortedMergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] }
|
||||
func (h *booleanSortedMergeHeap) Less(i, j int) bool {
|
||||
if h.fast {
|
||||
return h.items[i].fastIdx < h.items[j].fastIdx
|
||||
}
|
||||
|
||||
x, y := h.items[i].point, h.items[j].point
|
||||
|
||||
if h.opt.Ascending {
|
||||
|
@ -11131,6 +11375,8 @@ type booleanSortedMergeHeapItem struct {
|
|||
point *BooleanPoint
|
||||
err error
|
||||
itr BooleanIterator
|
||||
// index for fast shortcut
|
||||
fastIdx int
|
||||
}
|
||||
|
||||
// booleanIteratorScanner scans the results of a BooleanIterator into a map.
|
||||
|
|
|
@ -5,9 +5,9 @@ import (
|
|||
"container/heap"
|
||||
"io"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
"sync"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/influxdata/influxql"
|
||||
|
@ -373,6 +373,7 @@ func (itr *{{$k.name}}SortedMergeIterator) pop() (*{{$k.Name}}Point, error) {
|
|||
}
|
||||
itr.heap.items = append(itr.heap.items, item)
|
||||
}
|
||||
itr.heap.detectFast()
|
||||
heap.Init(itr.heap)
|
||||
itr.init = true
|
||||
}
|
||||
|
@ -410,11 +411,57 @@ func (itr *{{$k.name}}SortedMergeIterator) pop() (*{{$k.Name}}Point, error) {
|
|||
type {{$k.name}}SortedMergeHeap struct {
|
||||
opt IteratorOptions
|
||||
items []*{{$k.name}}SortedMergeHeapItem
|
||||
// if each input comes from a unique single time series, we can make a shortcut.
|
||||
// detection of the shortcut introduces some overhead but it gets significant
|
||||
// performance improvement in cases like SELECT * FROM m GROUP BY *
|
||||
fast bool
|
||||
}
|
||||
|
||||
func (h *{{$k.name}}SortedMergeHeap) detectFast() {
|
||||
for _, item := range h.items {
|
||||
if item.itr.Stats().SeriesN != 1 {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
hasDup := false
|
||||
s := make([]*{{$k.name}}SortedMergeHeapItem, len(h.items))
|
||||
copy(s, h.items)
|
||||
|
||||
less := func(i, j int) bool {
|
||||
x, y := s[i].point, s[j].point
|
||||
ret := strings.Compare(x.Name, y.Name)
|
||||
if ret == 0 {
|
||||
ret = strings.Compare(x.Tags.ID(), y.Tags.ID())
|
||||
}
|
||||
if ret != 0 {
|
||||
// TT
|
||||
// ret | == -1 | h.opt.Ascending | result
|
||||
// 1 | false | false | true
|
||||
// -1 | true | false | false
|
||||
// 1 | false | true | false
|
||||
// -1 | true | true | true
|
||||
return ret == -1 == h.opt.Ascending
|
||||
}
|
||||
hasDup = true
|
||||
return false
|
||||
}
|
||||
sort.Slice(s, less)
|
||||
if !hasDup {
|
||||
h.fast = true
|
||||
for i, item := range s {
|
||||
item.fastIdx = i
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *{{$k.name}}SortedMergeHeap) Len() int { return len(h.items) }
|
||||
func (h *{{$k.name}}SortedMergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] }
|
||||
func (h *{{$k.name}}SortedMergeHeap) Less(i, j int) bool {
|
||||
if h.fast {
|
||||
return h.items[i].fastIdx < h.items[j].fastIdx
|
||||
}
|
||||
|
||||
x, y := h.items[i].point, h.items[j].point
|
||||
|
||||
if h.opt.Ascending {
|
||||
|
@ -488,6 +535,8 @@ type {{$k.name}}SortedMergeHeapItem struct {
|
|||
point *{{$k.Name}}Point
|
||||
err error
|
||||
itr {{$k.Name}}Iterator
|
||||
// index for fast shortcut
|
||||
fastIdx int
|
||||
}
|
||||
|
||||
// {{$k.name}}IteratorScanner scans the results of a {{$k.Name}}Iterator into a map.
|
||||
|
|
|
@ -0,0 +1,306 @@
|
|||
package query
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/influxql"
|
||||
)
|
||||
|
||||
// a simple FloatIterator for testing
|
||||
type floatIterator struct {
|
||||
points []FloatPoint
|
||||
closed bool
|
||||
stats IteratorStats
|
||||
}
|
||||
|
||||
func (itr *floatIterator) Stats() IteratorStats { return itr.stats }
|
||||
func (itr *floatIterator) Close() error { itr.closed = true; return nil }
|
||||
|
||||
// Next returns the next value and shifts it off the beginning of the points slice.
|
||||
func (itr *floatIterator) Next() (*FloatPoint, error) {
|
||||
if len(itr.points) == 0 || itr.closed {
|
||||
return nil, nil
|
||||
}
|
||||
v := &itr.points[0]
|
||||
itr.points = itr.points[1:]
|
||||
return v, nil
|
||||
}
|
||||
|
||||
func TestSortedMergeHeap_DetectFast(t *testing.T) {
|
||||
|
||||
suite := []*struct {
|
||||
inputs []FloatIterator
|
||||
ascending bool
|
||||
fast bool // expected status
|
||||
}{
|
||||
|
||||
// case 0
|
||||
{
|
||||
inputs: []FloatIterator{
|
||||
&floatIterator{
|
||||
points: []FloatPoint{
|
||||
{Name: "cpu", Tags: NewTags(map[string]string{"host": "A"}), Time: 0, Value: 1},
|
||||
{Name: "cpu", Tags: NewTags(map[string]string{"host": "A"}), Time: 12, Value: 3},
|
||||
{Name: "cpu", Tags: NewTags(map[string]string{"host": "A"}), Time: 30, Value: 4},
|
||||
{Name: "cpu", Tags: NewTags(map[string]string{"host": "B"}), Time: 40, Value: 2},
|
||||
{Name: "mem", Tags: NewTags(map[string]string{"host": "B"}), Time: 50, Value: 8},
|
||||
},
|
||||
stats: IteratorStats{SeriesN: 3},
|
||||
},
|
||||
&floatIterator{
|
||||
points: []FloatPoint{
|
||||
{Name: "cpu", Tags: NewTags(map[string]string{"host": "A"}), Time: 0, Value: 1},
|
||||
{Name: "cpu", Tags: NewTags(map[string]string{"host": "A"}), Time: 12, Value: 3},
|
||||
{Name: "cpu", Tags: NewTags(map[string]string{"host": "A"}), Time: 30, Value: 4},
|
||||
{Name: "cpu", Tags: NewTags(map[string]string{"host": "B"}), Time: 40, Value: 2},
|
||||
{Name: "mem", Tags: NewTags(map[string]string{"host": "B"}), Time: 50, Value: 8},
|
||||
},
|
||||
stats: IteratorStats{SeriesN: 3},
|
||||
},
|
||||
},
|
||||
ascending: true,
|
||||
fast: false,
|
||||
},
|
||||
// case 1
|
||||
{
|
||||
inputs: []FloatIterator{
|
||||
&floatIterator{
|
||||
points: []FloatPoint{
|
||||
{Name: "cpu", Tags: NewTags(map[string]string{"host": "A"}), Time: 0, Value: 1},
|
||||
{Name: "cpu", Tags: NewTags(map[string]string{"host": "A"}), Time: 12, Value: 3},
|
||||
{Name: "cpu", Tags: NewTags(map[string]string{"host": "A"}), Time: 30, Value: 4},
|
||||
{Name: "cpu", Tags: NewTags(map[string]string{"host": "A"}), Time: 61, Value: 8},
|
||||
},
|
||||
stats: IteratorStats{SeriesN: 1},
|
||||
},
|
||||
&floatIterator{
|
||||
points: []FloatPoint{
|
||||
{Name: "mem", Tags: NewTags(map[string]string{"host": "B"}), Time: 0, Value: 1},
|
||||
{Name: "mem", Tags: NewTags(map[string]string{"host": "B"}), Time: 12, Value: 3},
|
||||
{Name: "mem", Tags: NewTags(map[string]string{"host": "B"}), Time: 30, Value: 4},
|
||||
{Name: "mem", Tags: NewTags(map[string]string{"host": "B"}), Time: 4, Value: 2},
|
||||
{Name: "mem", Tags: NewTags(map[string]string{"host": "B"}), Time: 61, Value: 8},
|
||||
},
|
||||
stats: IteratorStats{SeriesN: 1},
|
||||
},
|
||||
},
|
||||
ascending: false,
|
||||
fast: true,
|
||||
},
|
||||
// case 2
|
||||
{
|
||||
inputs: []FloatIterator{
|
||||
&floatIterator{
|
||||
points: []FloatPoint{
|
||||
{Name: "cpu", Tags: NewTags(map[string]string{"host": "A"}), Time: 0, Value: 1},
|
||||
{Name: "cpu", Tags: NewTags(map[string]string{"host": "A"}), Time: 12, Value: 3},
|
||||
{Name: "cpu", Tags: NewTags(map[string]string{"host": "A"}), Time: 30, Value: 4},
|
||||
{Name: "cpu", Tags: NewTags(map[string]string{"host": "A"}), Time: 51, Value: 8},
|
||||
},
|
||||
stats: IteratorStats{SeriesN: 1},
|
||||
},
|
||||
&floatIterator{
|
||||
points: []FloatPoint{
|
||||
{Name: "cpu", Tags: NewTags(map[string]string{"host": "B"}), Time: 1, Value: 8},
|
||||
{Name: "mem", Tags: NewTags(map[string]string{"host": "B"}), Time: 10, Value: 1},
|
||||
{Name: "mem", Tags: NewTags(map[string]string{"host": "B"}), Time: 12, Value: 3},
|
||||
{Name: "mem", Tags: NewTags(map[string]string{"host": "B"}), Time: 30, Value: 4},
|
||||
{Name: "mem", Tags: NewTags(map[string]string{"host": "B"}), Time: 40, Value: 2},
|
||||
},
|
||||
stats: IteratorStats{SeriesN: 2},
|
||||
},
|
||||
},
|
||||
ascending: true,
|
||||
fast: false,
|
||||
},
|
||||
}
|
||||
|
||||
for i, c := range suite {
|
||||
h := createFloatSortedMergeHeap(
|
||||
c.inputs,
|
||||
IteratorOptions{
|
||||
StartTime: influxql.MinTime,
|
||||
EndTime: influxql.MaxTime,
|
||||
Ascending: c.ascending,
|
||||
})
|
||||
h.detectFast()
|
||||
if h.fast != c.fast {
|
||||
t.Fatalf("unexpected shortcut status for sorted merge heap, case %d", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func createFloatSortedMergeHeap(inputs []FloatIterator, opt IteratorOptions) *floatSortedMergeHeap {
|
||||
h := &floatSortedMergeHeap{
|
||||
items: make([]*floatSortedMergeHeapItem, 0, len(inputs)),
|
||||
opt: opt,
|
||||
}
|
||||
|
||||
items2 := make([]*floatSortedMergeHeapItem, 0, len(inputs))
|
||||
for _, input := range inputs {
|
||||
items2 = append(items2, &floatSortedMergeHeapItem{itr: input})
|
||||
}
|
||||
for _, item := range items2 {
|
||||
var err error
|
||||
if item.point, err = item.itr.Next(); err != nil {
|
||||
panic(err)
|
||||
} else if item.point == nil {
|
||||
continue
|
||||
}
|
||||
h.items = append(h.items, item)
|
||||
}
|
||||
return h
|
||||
}
|
||||
|
||||
// a simple iterator that has only a single series
|
||||
type simpleFloatIterator struct {
|
||||
point FloatPoint
|
||||
size int
|
||||
populated int
|
||||
stats IteratorStats
|
||||
}
|
||||
|
||||
func (itr *simpleFloatIterator) Stats() IteratorStats {
|
||||
return itr.stats
|
||||
}
|
||||
|
||||
func (itr *simpleFloatIterator) Close() error { itr.populated = itr.size; return nil }
|
||||
func (itr *simpleFloatIterator) Next() (*FloatPoint, error) {
|
||||
if itr.populated >= itr.size {
|
||||
return nil, nil
|
||||
}
|
||||
p := itr.point.Clone()
|
||||
p.Time += int64(itr.populated * 1000)
|
||||
itr.populated++
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func BenchmarkSortedMergeIterator_Fast(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
sortedMergeIterFast()
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkSortedMergeIterator_NotFast(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
sortedMergeIterNotFast()
|
||||
}
|
||||
}
|
||||
|
||||
func sortedMergeIterFast() {
|
||||
inputs := []Iterator{}
|
||||
inputs = append(inputs,
|
||||
&simpleFloatIterator{
|
||||
point: FloatPoint{Name: "cpu", Tags: NewTags(map[string]string{"taga": "aaaaaaaaaa", "tagb": "bbbbbbbbbb", "tagc": "cccccccccc", "tagd": "dddddddddd", "tage": "eeeeeeeeee", "tagf": "one"}), Time: 10, Value: 2},
|
||||
size: 10000,
|
||||
stats: IteratorStats{SeriesN: 1},
|
||||
})
|
||||
inputs = append(inputs,
|
||||
&simpleFloatIterator{
|
||||
point: FloatPoint{Name: "cpu", Tags: NewTags(map[string]string{"taga": "aaaaaaaaaa", "tagb": "bbbbbbbbbb", "tagc": "cccccccccc", "tagd": "dddddddddd", "tage": "eeeeeeeeee", "tagf": "two"}), Time: 10, Value: 2},
|
||||
size: 10000,
|
||||
stats: IteratorStats{SeriesN: 1},
|
||||
})
|
||||
inputs = append(inputs,
|
||||
&simpleFloatIterator{
|
||||
point: FloatPoint{Name: "cpu", Tags: NewTags(map[string]string{"taga": "aaaaaaaaaa", "tagb": "bbbbbbbbbb", "tagc": "cccccccccc", "tagd": "dddddddddd", "tage": "eeeeeeeeee", "tagf": "three"}), Time: 10, Value: 2},
|
||||
size: 10000,
|
||||
stats: IteratorStats{SeriesN: 1},
|
||||
})
|
||||
|
||||
itr := NewSortedMergeIterator(inputs, IteratorOptions{}).(*floatSortedMergeIterator)
|
||||
p, _ := itr.Next()
|
||||
for p != nil {
|
||||
p, _ = itr.Next()
|
||||
}
|
||||
}
|
||||
|
||||
func sortedMergeIterNotFast() {
|
||||
inputs := []Iterator{}
|
||||
inputs = append(inputs,
|
||||
&simpleFloatIterator{
|
||||
point: FloatPoint{Name: "cpu", Tags: NewTags(map[string]string{"taga": "aaaaaaaaaa", "tagb": "bbbbbbbbbb", "tagc": "cccccccccc", "tagd": "dddddddddd", "tage": "eeeeeeeeee", "tagf": "four"}), Time: 10, Value: 2},
|
||||
size: 10000,
|
||||
stats: IteratorStats{SeriesN: 2},
|
||||
})
|
||||
inputs = append(inputs,
|
||||
&simpleFloatIterator{
|
||||
point: FloatPoint{Name: "cpu", Tags: NewTags(map[string]string{"taga": "aaaaaaaaaa", "tagb": "bbbbbbbbbb", "tagc": "cccccccccc", "tagd": "dddddddddd", "tage": "eeeeeeeeee", "tagf": "five"}), Time: 10, Value: 2},
|
||||
size: 10000,
|
||||
stats: IteratorStats{SeriesN: 2},
|
||||
})
|
||||
inputs = append(inputs,
|
||||
&simpleFloatIterator{
|
||||
point: FloatPoint{Name: "cpu", Tags: NewTags(map[string]string{"taga": "aaaaaaaaaa", "tagb": "bbbbbbbbbb", "tagc": "cccccccccc", "tagd": "dddddddddd", "tage": "eeeeeeeeee", "tagf": "fix"}), Time: 10, Value: 2},
|
||||
size: 10000,
|
||||
stats: IteratorStats{SeriesN: 2},
|
||||
})
|
||||
|
||||
opt := IteratorOptions{
|
||||
Dimensions: []string{"taga", "tagb", "tagc"},
|
||||
}
|
||||
itr := NewSortedMergeIterator(inputs, opt).(*floatSortedMergeIterator)
|
||||
p, _ := itr.Next()
|
||||
for p != nil {
|
||||
p, _ = itr.Next()
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkSortedMergeIterator_FastCheckOverhead(b *testing.B) {
|
||||
inputs := []FloatIterator{}
|
||||
inputs = append(inputs,
|
||||
&simpleFloatIterator{
|
||||
point: FloatPoint{Name: "cpu", Tags: NewTags(map[string]string{"taga": "aaaaaaaaaa", "tagb": "bbbbbbbbbb", "tagc": "cccccccccc", "tagd": "dddddddddd", "tage": "eeeeeeeeee", "tagf": "one"}), Time: 10, Value: 2},
|
||||
size: 10000,
|
||||
stats: IteratorStats{SeriesN: 1},
|
||||
})
|
||||
inputs = append(inputs,
|
||||
&simpleFloatIterator{
|
||||
point: FloatPoint{Name: "cpu", Tags: NewTags(map[string]string{"taga": "aaaaaaaaaa", "tagb": "bbbbbbbbbb", "tagc": "cccccccccc", "tagd": "dddddddddd", "tage": "eeeeeeeeee", "tagf": "two"}), Time: 10, Value: 2},
|
||||
size: 10000,
|
||||
stats: IteratorStats{SeriesN: 1},
|
||||
})
|
||||
inputs = append(inputs,
|
||||
&simpleFloatIterator{
|
||||
point: FloatPoint{Name: "cpu", Tags: NewTags(map[string]string{"taga": "aaaaaaaaaa", "tagb": "bbbbbbbbbb", "tagc": "cccccccccc", "tagd": "dddddddddd", "tage": "eeeeeeeeee", "tagf": "three"}), Time: 10, Value: 2},
|
||||
size: 10000,
|
||||
stats: IteratorStats{SeriesN: 1},
|
||||
})
|
||||
inputs = append(inputs,
|
||||
&simpleFloatIterator{
|
||||
point: FloatPoint{Name: "cpu", Tags: NewTags(map[string]string{"taga": "aaaaaaaaaa", "tagb": "bbbbbbbbbb", "tagc": "cccccccccc", "tagd": "dddddddddd", "tage": "eeeeeeeeee", "tagf": "four"}), Time: 10, Value: 2},
|
||||
size: 1000000,
|
||||
stats: IteratorStats{SeriesN: 1},
|
||||
})
|
||||
inputs = append(inputs,
|
||||
&simpleFloatIterator{
|
||||
point: FloatPoint{Name: "cpu", Tags: NewTags(map[string]string{"taga": "aaaaaaaaaa", "tagb": "bbbbbbbbbb", "tagc": "cccccccccc", "tagd": "dddddddddd", "tage": "eeeeeeeeee", "tagf": "five"}), Time: 10, Value: 2},
|
||||
size: 1000000,
|
||||
stats: IteratorStats{SeriesN: 1},
|
||||
})
|
||||
inputs = append(inputs,
|
||||
&simpleFloatIterator{
|
||||
point: FloatPoint{Name: "cpu", Tags: NewTags(map[string]string{"taga": "aaaaaaaaaa", "tagb": "bbbbbbbbbb", "tagc": "cccccccccc", "tagd": "dddddddddd", "tage": "eeeeeeeeee", "tagf": "fix"}), Time: 10, Value: 2},
|
||||
size: 1000000,
|
||||
stats: IteratorStats{SeriesN: 1},
|
||||
})
|
||||
inputs = append(inputs,
|
||||
&simpleFloatIterator{
|
||||
point: FloatPoint{Name: "cpu", Tags: NewTags(map[string]string{"taga": "aaaaaaaaaa", "tagb": "bbbbbbbbbb", "tagc": "cccccccccc", "tagd": "dddddddddd", "tage": "eeeeeeeeee", "tagf": "one"}), Time: 10, Value: 2},
|
||||
size: 10000,
|
||||
stats: IteratorStats{SeriesN: 1},
|
||||
})
|
||||
h := createFloatSortedMergeHeap(
|
||||
inputs,
|
||||
IteratorOptions{
|
||||
StartTime: influxql.MinTime,
|
||||
EndTime: influxql.MaxTime,
|
||||
})
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
h.detectFast()
|
||||
if h.fast {
|
||||
panic("unexpected shortcut")
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue