Sort MergeIterator by tags after name and before the window

pull/5196/head
Jonathan A. Sternberg 2016-02-04 16:13:43 -05:00 committed by Ben Johnson
parent 47c2bab74b
commit 21d2a4c3de
3 changed files with 98 additions and 61 deletions

View File

@ -104,6 +104,7 @@ type floatMergeIterator struct {
curr *floatMergeHeapItem
window struct {
name string
tags string
startTime int64
endTime int64
}
@ -155,7 +156,7 @@ func (itr *floatMergeIterator) Next() *FloatPoint {
// Read point and set current window.
p := itr.curr.itr.Next()
itr.window.name = p.Name
itr.window.name, itr.window.tags = p.Name, p.Tags.ID()
itr.window.startTime, itr.window.endTime = itr.heap.opt.Window(p.Time)
return p
}
@ -173,6 +174,8 @@ func (itr *floatMergeIterator) Next() *FloatPoint {
inWindow := true
if itr.window.name != p.Name {
inWindow = false
} else if itr.window.tags != p.Tags.ID() {
inWindow = false
} else if itr.heap.opt.Ascending && p.Time >= itr.window.endTime {
inWindow = false
} else if !itr.heap.opt.Ascending && p.Time < itr.window.startTime {
@ -206,10 +209,14 @@ func (h floatMergeHeap) Less(i, j int) bool {
if h.opt.Ascending {
if x.Name != y.Name {
return x.Name < y.Name
} else if x.Tags.ID() != y.Tags.ID() {
return x.Tags.ID() < y.Tags.ID()
}
} else {
if x.Name != y.Name {
return x.Name > y.Name
} else if x.Tags.ID() != y.Tags.ID() {
return x.Tags.ID() > y.Tags.ID()
}
}
@ -805,6 +812,7 @@ type integerMergeIterator struct {
curr *integerMergeHeapItem
window struct {
name string
tags string
startTime int64
endTime int64
}
@ -856,7 +864,7 @@ func (itr *integerMergeIterator) Next() *IntegerPoint {
// Read point and set current window.
p := itr.curr.itr.Next()
itr.window.name = p.Name
itr.window.name, itr.window.tags = p.Name, p.Tags.ID()
itr.window.startTime, itr.window.endTime = itr.heap.opt.Window(p.Time)
return p
}
@ -874,6 +882,8 @@ func (itr *integerMergeIterator) Next() *IntegerPoint {
inWindow := true
if itr.window.name != p.Name {
inWindow = false
} else if itr.window.tags != p.Tags.ID() {
inWindow = false
} else if itr.heap.opt.Ascending && p.Time >= itr.window.endTime {
inWindow = false
} else if !itr.heap.opt.Ascending && p.Time < itr.window.startTime {
@ -907,10 +917,14 @@ func (h integerMergeHeap) Less(i, j int) bool {
if h.opt.Ascending {
if x.Name != y.Name {
return x.Name < y.Name
} else if x.Tags.ID() != y.Tags.ID() {
return x.Tags.ID() < y.Tags.ID()
}
} else {
if x.Name != y.Name {
return x.Name > y.Name
} else if x.Tags.ID() != y.Tags.ID() {
return x.Tags.ID() > y.Tags.ID()
}
}
@ -1506,6 +1520,7 @@ type stringMergeIterator struct {
curr *stringMergeHeapItem
window struct {
name string
tags string
startTime int64
endTime int64
}
@ -1557,7 +1572,7 @@ func (itr *stringMergeIterator) Next() *StringPoint {
// Read point and set current window.
p := itr.curr.itr.Next()
itr.window.name = p.Name
itr.window.name, itr.window.tags = p.Name, p.Tags.ID()
itr.window.startTime, itr.window.endTime = itr.heap.opt.Window(p.Time)
return p
}
@ -1575,6 +1590,8 @@ func (itr *stringMergeIterator) Next() *StringPoint {
inWindow := true
if itr.window.name != p.Name {
inWindow = false
} else if itr.window.tags != p.Tags.ID() {
inWindow = false
} else if itr.heap.opt.Ascending && p.Time >= itr.window.endTime {
inWindow = false
} else if !itr.heap.opt.Ascending && p.Time < itr.window.startTime {
@ -1608,10 +1625,14 @@ func (h stringMergeHeap) Less(i, j int) bool {
if h.opt.Ascending {
if x.Name != y.Name {
return x.Name < y.Name
} else if x.Tags.ID() != y.Tags.ID() {
return x.Tags.ID() < y.Tags.ID()
}
} else {
if x.Name != y.Name {
return x.Name > y.Name
} else if x.Tags.ID() != y.Tags.ID() {
return x.Tags.ID() > y.Tags.ID()
}
}
@ -2207,6 +2228,7 @@ type booleanMergeIterator struct {
curr *booleanMergeHeapItem
window struct {
name string
tags string
startTime int64
endTime int64
}
@ -2258,7 +2280,7 @@ func (itr *booleanMergeIterator) Next() *BooleanPoint {
// Read point and set current window.
p := itr.curr.itr.Next()
itr.window.name = p.Name
itr.window.name, itr.window.tags = p.Name, p.Tags.ID()
itr.window.startTime, itr.window.endTime = itr.heap.opt.Window(p.Time)
return p
}
@ -2276,6 +2298,8 @@ func (itr *booleanMergeIterator) Next() *BooleanPoint {
inWindow := true
if itr.window.name != p.Name {
inWindow = false
} else if itr.window.tags != p.Tags.ID() {
inWindow = false
} else if itr.heap.opt.Ascending && p.Time >= itr.window.endTime {
inWindow = false
} else if !itr.heap.opt.Ascending && p.Time < itr.window.startTime {
@ -2309,10 +2333,14 @@ func (h booleanMergeHeap) Less(i, j int) bool {
if h.opt.Ascending {
if x.Name != y.Name {
return x.Name < y.Name
} else if x.Tags.ID() != y.Tags.ID() {
return x.Tags.ID() < y.Tags.ID()
}
} else {
if x.Name != y.Name {
return x.Name > y.Name
} else if x.Tags.ID() != y.Tags.ID() {
return x.Tags.ID() > y.Tags.ID()
}
}

View File

@ -104,6 +104,7 @@ type {{.name}}MergeIterator struct {
curr *{{.name}}MergeHeapItem
window struct {
name string
tags string
startTime int64
endTime int64
}
@ -155,7 +156,7 @@ func (itr *{{.name}}MergeIterator) Next() *{{.Name}}Point {
// Read point and set current window.
p := itr.curr.itr.Next()
itr.window.name = p.Name
itr.window.name, itr.window.tags = p.Name, p.Tags.ID()
itr.window.startTime, itr.window.endTime = itr.heap.opt.Window(p.Time)
return p
}
@ -173,6 +174,8 @@ func (itr *{{.name}}MergeIterator) Next() *{{.Name}}Point {
inWindow := true
if itr.window.name != p.Name {
inWindow = false
} else if itr.window.tags != p.Tags.ID() {
inWindow = false
} else if itr.heap.opt.Ascending && p.Time >= itr.window.endTime {
inWindow = false
} else if !itr.heap.opt.Ascending && p.Time < itr.window.startTime {
@ -206,10 +209,14 @@ func (h {{.name}}MergeHeap) Less(i, j int) bool {
if h.opt.Ascending {
if x.Name != y.Name {
return x.Name < y.Name
} else if x.Tags.ID() != y.Tags.ID() {
return x.Tags.ID() < y.Tags.ID()
}
} else {
if x.Name != y.Name {
return x.Name > y.Name
} else if x.Tags.ID() != y.Tags.ID() {
return x.Tags.ID() > y.Tags.ID()
}
}

View File

@ -17,15 +17,15 @@ func TestMergeIterator_Float(t *testing.T) {
inputs := []*FloatIterator{
{Points: []influxql.FloatPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2},
{Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: 8},
}},
{Points: []influxql.FloatPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7},
{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9},
}},
{Points: []influxql.FloatPoint{}},
@ -40,14 +40,14 @@ func TestMergeIterator_Float(t *testing.T) {
})
if a := Iterators([]influxql.Iterator{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4}},
{&influxql.FloatPoint{Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: 8}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6}},
{&influxql.FloatPoint{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9}},
{&influxql.FloatPoint{Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: 8}},
}) {
t.Errorf("unexpected points: %s", spew.Sdump(a))
}
@ -64,15 +64,15 @@ func TestMergeIterator_Integer(t *testing.T) {
inputs := []*IntegerIterator{
{Points: []influxql.IntegerPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2},
{Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: 8},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2},
{Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: 8},
}},
{Points: []influxql.IntegerPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7},
{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9},
}},
{Points: []influxql.IntegerPoint{}},
@ -84,17 +84,17 @@ func TestMergeIterator_Integer(t *testing.T) {
Ascending: true,
})
if a, ok := CompareIntegerIterator(itr, []influxql.IntegerPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6},
{Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: 8},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7},
{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4},
}); !ok {
if a := Iterators([]influxql.Iterator{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1}},
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3}},
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7}},
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4}},
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2}},
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5}},
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6}},
{&influxql.IntegerPoint{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9}},
{&influxql.IntegerPoint{Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: 8}},
}) {
t.Errorf("unexpected points: %s", spew.Sdump(a))
}
@ -110,15 +110,15 @@ func TestMergeIterator_String(t *testing.T) {
inputs := []*StringIterator{
{Points: []influxql.StringPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: "a"},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: "b"},
{Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: "h"},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: "c"},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: "d"},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: "b"},
{Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: "h"},
}},
{Points: []influxql.StringPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: "g"},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: "e"},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: "f"},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: "g"},
{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: "i"},
}},
{Points: []influxql.StringPoint{}},
@ -129,17 +129,18 @@ func TestMergeIterator_String(t *testing.T) {
},
Ascending: true,
})
if a, ok := CompareStringIterator(itr, []influxql.StringPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: "a"},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: "b"},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: "e"},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: "f"},
{Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: "h"},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: "c"},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: "g"},
{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: "i"},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: "d"},
}); !ok {
if a := Iterators([]influxql.Iterator{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.StringPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: "a"}},
{&influxql.StringPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: "c"}},
{&influxql.StringPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: "g"}},
{&influxql.StringPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: "d"}},
{&influxql.StringPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: "b"}},
{&influxql.StringPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: "e"}},
{&influxql.StringPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: "f"}},
{&influxql.StringPoint{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: "i"}},
{&influxql.StringPoint{Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: "h"}},
}) {
t.Errorf("unexpected points: %s", spew.Sdump(a))
}
@ -155,15 +156,15 @@ func TestMergeIterator_Boolean(t *testing.T) {
inputs := []*BooleanIterator{
{Points: []influxql.BooleanPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: true},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: false},
{Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: true},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: true},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: false},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: false},
{Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: true},
}},
{Points: []influxql.BooleanPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: true},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: true},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: false},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: true},
{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: false},
}},
{Points: []influxql.BooleanPoint{}},
@ -174,17 +175,18 @@ func TestMergeIterator_Boolean(t *testing.T) {
},
Ascending: true,
})
if a, ok := CompareBooleanIterator(itr, []influxql.BooleanPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: true},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: false},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: true},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: false},
{Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: true},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: true},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: true},
{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: false},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: false},
}); !ok {
if a := Iterators([]influxql.Iterator{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.BooleanPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: true}},
{&influxql.BooleanPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: true}},
{&influxql.BooleanPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: true}},
{&influxql.BooleanPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: false}},
{&influxql.BooleanPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: false}},
{&influxql.BooleanPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: true}},
{&influxql.BooleanPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: false}},
{&influxql.BooleanPoint{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: false}},
{&influxql.BooleanPoint{Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: true}},
}) {
t.Errorf("unexpected points: %s", spew.Sdump(a))
}
@ -207,15 +209,15 @@ func TestMergeIterator_Cast_Float(t *testing.T) {
inputs := []influxql.Iterator{
&IntegerIterator{Points: []influxql.IntegerPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2},
{Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: 8},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2},
{Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: 8},
}},
&FloatIterator{Points: []influxql.FloatPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5},
{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7},
{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9},
}},
}
@ -228,14 +230,14 @@ func TestMergeIterator_Cast_Float(t *testing.T) {
})
if a := Iterators([]influxql.Iterator{itr}).ReadAll(); !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0, Value: 1}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 1, Value: 2}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 11, Value: 5}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 13, Value: 6}},
{&influxql.FloatPoint{Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: 8}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 12, Value: 3}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20, Value: 7}},
{&influxql.FloatPoint{Name: "mem", Tags: ParseTags("host=A"), Time: 25, Value: 9}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30, Value: 4}},
{&influxql.FloatPoint{Name: "mem", Tags: ParseTags("host=B"), Time: 11, Value: 8}},
}) {
t.Errorf("unexpected points: %s", spew.Sdump(a))
}