Ensure that sorted heaps are merged correctly

When merging streams of system iterators we don't use tags or time.
Instead we add series keys (in the case of, for example, `SHOW SERIES`)
to the `Aux` field of the iterators' elements. This is because we only
emit merged and sorted sets of series key to the client.

We currently use `SortedMergeHeap`s to merge together multiple
iterators, and the comparitor function did not consider `Aux` fields
when determining which heap to pop the next item off during a merge. As
such, `SHOW SERIES` and `SHOW TAG KEYS` (any meta query that gets
converted into a special type of `SELECT`) were returning results in
arbitrary order.

This issue was never noticed on the `inmem` index because the streams
are always duplicates of each other, and of course it doesn't matter if
you arbitrarily merge together two idential, sorted streams...

The issue first manifested itself on the `tsi1` index, but this fix will
apply to both indexes.
pull/8735/head
Edd Robinson 2017-08-14 19:07:49 +01:00
parent ae66d327a7
commit 8c4686fb1b
4 changed files with 281 additions and 15 deletions

View File

@ -38,6 +38,7 @@
- [#8706](https://github.com/influxdata/influxdb/pull/8706): Cursor leak, resulting in an accumulation of `.tsm.tmp` files after compactions.
- [#8712](https://github.com/influxdata/influxdb/pull/8712): Force time expressions to use AND and improve condition parsing.
- [#8716](https://github.com/influxdata/influxdb/pull/8716): Ensure inputs are closed on error. Add runtime GC finalizer as additional guard to close iterators
- [#8695](https://github.com/influxdata/influxdb/issues/8695): Fix merging bug on system iterators.
## v1.3.4 [unreleased]

View File

@ -395,6 +395,12 @@ func (itr *floatSortedMergeIterator) pop() (*FloatPoint, error) {
}
// floatSortedMergeHeap represents a heap of floatSortedMergeHeapItems.
// Items are sorted with the following priority:
// - By their measurement name;
// - By their tag keys/values;
// - By time; or
// - By their Aux field values.
//
type floatSortedMergeHeap struct {
opt IteratorOptions
items []*floatSortedMergeHeapItem
@ -411,7 +417,26 @@ func (h *floatSortedMergeHeap) Less(i, j int) bool {
} else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); !xTags.Equals(&yTags) {
return xTags.ID() < yTags.ID()
}
return x.Time < y.Time
if x.Time != y.Time {
return x.Time < y.Time
}
if len(x.Aux) > 0 && len(x.Aux) == len(y.Aux) {
for i := 0; i < len(x.Aux); i++ {
v1, ok1 := x.Aux[i].(string)
v2, ok2 := y.Aux[i].(string)
if !ok1 || !ok2 {
// Unsupported types used in Aux fields. Maybe they
// need to be added here?
return false
} else if v1 == v2 {
continue
}
return v1 < v2
}
}
return false // Times and/or Aux fields are equal.
}
if x.Name != y.Name {
@ -419,7 +444,26 @@ func (h *floatSortedMergeHeap) Less(i, j int) bool {
} else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); !xTags.Equals(&yTags) {
return xTags.ID() > yTags.ID()
}
return x.Time > y.Time
if x.Time != y.Time {
return x.Time > y.Time
}
if len(x.Aux) > 0 && len(x.Aux) == len(y.Aux) {
for i := 0; i < len(x.Aux); i++ {
v1, ok1 := x.Aux[i].(string)
v2, ok2 := y.Aux[i].(string)
if !ok1 || !ok2 {
// Unsupported types used in Aux fields. Maybe they
// need to be added here?
return false
} else if v1 == v2 {
continue
}
return v1 > v2
}
}
return false // Times and/or Aux fields are equal.
}
func (h *floatSortedMergeHeap) Push(x interface{}) {
@ -3742,6 +3786,12 @@ func (itr *integerSortedMergeIterator) pop() (*IntegerPoint, error) {
}
// integerSortedMergeHeap represents a heap of integerSortedMergeHeapItems.
// Items are sorted with the following priority:
// - By their measurement name;
// - By their tag keys/values;
// - By time; or
// - By their Aux field values.
//
type integerSortedMergeHeap struct {
opt IteratorOptions
items []*integerSortedMergeHeapItem
@ -3758,7 +3808,26 @@ func (h *integerSortedMergeHeap) Less(i, j int) bool {
} else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); !xTags.Equals(&yTags) {
return xTags.ID() < yTags.ID()
}
return x.Time < y.Time
if x.Time != y.Time {
return x.Time < y.Time
}
if len(x.Aux) > 0 && len(x.Aux) == len(y.Aux) {
for i := 0; i < len(x.Aux); i++ {
v1, ok1 := x.Aux[i].(string)
v2, ok2 := y.Aux[i].(string)
if !ok1 || !ok2 {
// Unsupported types used in Aux fields. Maybe they
// need to be added here?
return false
} else if v1 == v2 {
continue
}
return v1 < v2
}
}
return false // Times and/or Aux fields are equal.
}
if x.Name != y.Name {
@ -3766,7 +3835,26 @@ func (h *integerSortedMergeHeap) Less(i, j int) bool {
} else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); !xTags.Equals(&yTags) {
return xTags.ID() > yTags.ID()
}
return x.Time > y.Time
if x.Time != y.Time {
return x.Time > y.Time
}
if len(x.Aux) > 0 && len(x.Aux) == len(y.Aux) {
for i := 0; i < len(x.Aux); i++ {
v1, ok1 := x.Aux[i].(string)
v2, ok2 := y.Aux[i].(string)
if !ok1 || !ok2 {
// Unsupported types used in Aux fields. Maybe they
// need to be added here?
return false
} else if v1 == v2 {
continue
}
return v1 > v2
}
}
return false // Times and/or Aux fields are equal.
}
func (h *integerSortedMergeHeap) Push(x interface{}) {
@ -7086,6 +7174,12 @@ func (itr *unsignedSortedMergeIterator) pop() (*UnsignedPoint, error) {
}
// unsignedSortedMergeHeap represents a heap of unsignedSortedMergeHeapItems.
// Items are sorted with the following priority:
// - By their measurement name;
// - By their tag keys/values;
// - By time; or
// - By their Aux field values.
//
type unsignedSortedMergeHeap struct {
opt IteratorOptions
items []*unsignedSortedMergeHeapItem
@ -7102,7 +7196,26 @@ func (h *unsignedSortedMergeHeap) Less(i, j int) bool {
} else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); !xTags.Equals(&yTags) {
return xTags.ID() < yTags.ID()
}
return x.Time < y.Time
if x.Time != y.Time {
return x.Time < y.Time
}
if len(x.Aux) > 0 && len(x.Aux) == len(y.Aux) {
for i := 0; i < len(x.Aux); i++ {
v1, ok1 := x.Aux[i].(string)
v2, ok2 := y.Aux[i].(string)
if !ok1 || !ok2 {
// Unsupported types used in Aux fields. Maybe they
// need to be added here?
return false
} else if v1 == v2 {
continue
}
return v1 < v2
}
}
return false // Times and/or Aux fields are equal.
}
if x.Name != y.Name {
@ -7110,7 +7223,26 @@ func (h *unsignedSortedMergeHeap) Less(i, j int) bool {
} else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); !xTags.Equals(&yTags) {
return xTags.ID() > yTags.ID()
}
return x.Time > y.Time
if x.Time != y.Time {
return x.Time > y.Time
}
if len(x.Aux) > 0 && len(x.Aux) == len(y.Aux) {
for i := 0; i < len(x.Aux); i++ {
v1, ok1 := x.Aux[i].(string)
v2, ok2 := y.Aux[i].(string)
if !ok1 || !ok2 {
// Unsupported types used in Aux fields. Maybe they
// need to be added here?
return false
} else if v1 == v2 {
continue
}
return v1 > v2
}
}
return false // Times and/or Aux fields are equal.
}
func (h *unsignedSortedMergeHeap) Push(x interface{}) {
@ -10416,6 +10548,12 @@ func (itr *stringSortedMergeIterator) pop() (*StringPoint, error) {
}
// stringSortedMergeHeap represents a heap of stringSortedMergeHeapItems.
// Items are sorted with the following priority:
// - By their measurement name;
// - By their tag keys/values;
// - By time; or
// - By their Aux field values.
//
type stringSortedMergeHeap struct {
opt IteratorOptions
items []*stringSortedMergeHeapItem
@ -10432,7 +10570,26 @@ func (h *stringSortedMergeHeap) Less(i, j int) bool {
} else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); !xTags.Equals(&yTags) {
return xTags.ID() < yTags.ID()
}
return x.Time < y.Time
if x.Time != y.Time {
return x.Time < y.Time
}
if len(x.Aux) > 0 && len(x.Aux) == len(y.Aux) {
for i := 0; i < len(x.Aux); i++ {
v1, ok1 := x.Aux[i].(string)
v2, ok2 := y.Aux[i].(string)
if !ok1 || !ok2 {
// Unsupported types used in Aux fields. Maybe they
// need to be added here?
return false
} else if v1 == v2 {
continue
}
return v1 < v2
}
}
return false // Times and/or Aux fields are equal.
}
if x.Name != y.Name {
@ -10440,7 +10597,26 @@ func (h *stringSortedMergeHeap) Less(i, j int) bool {
} else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); !xTags.Equals(&yTags) {
return xTags.ID() > yTags.ID()
}
return x.Time > y.Time
if x.Time != y.Time {
return x.Time > y.Time
}
if len(x.Aux) > 0 && len(x.Aux) == len(y.Aux) {
for i := 0; i < len(x.Aux); i++ {
v1, ok1 := x.Aux[i].(string)
v2, ok2 := y.Aux[i].(string)
if !ok1 || !ok2 {
// Unsupported types used in Aux fields. Maybe they
// need to be added here?
return false
} else if v1 == v2 {
continue
}
return v1 > v2
}
}
return false // Times and/or Aux fields are equal.
}
func (h *stringSortedMergeHeap) Push(x interface{}) {
@ -13746,6 +13922,12 @@ func (itr *booleanSortedMergeIterator) pop() (*BooleanPoint, error) {
}
// booleanSortedMergeHeap represents a heap of booleanSortedMergeHeapItems.
// Items are sorted with the following priority:
// - By their measurement name;
// - By their tag keys/values;
// - By time; or
// - By their Aux field values.
//
type booleanSortedMergeHeap struct {
opt IteratorOptions
items []*booleanSortedMergeHeapItem
@ -13762,7 +13944,26 @@ func (h *booleanSortedMergeHeap) Less(i, j int) bool {
} else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); !xTags.Equals(&yTags) {
return xTags.ID() < yTags.ID()
}
return x.Time < y.Time
if x.Time != y.Time {
return x.Time < y.Time
}
if len(x.Aux) > 0 && len(x.Aux) == len(y.Aux) {
for i := 0; i < len(x.Aux); i++ {
v1, ok1 := x.Aux[i].(string)
v2, ok2 := y.Aux[i].(string)
if !ok1 || !ok2 {
// Unsupported types used in Aux fields. Maybe they
// need to be added here?
return false
} else if v1 == v2 {
continue
}
return v1 < v2
}
}
return false // Times and/or Aux fields are equal.
}
if x.Name != y.Name {
@ -13770,7 +13971,26 @@ func (h *booleanSortedMergeHeap) Less(i, j int) bool {
} else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); !xTags.Equals(&yTags) {
return xTags.ID() > yTags.ID()
}
return x.Time > y.Time
if x.Time != y.Time {
return x.Time > y.Time
}
if len(x.Aux) > 0 && len(x.Aux) == len(y.Aux) {
for i := 0; i < len(x.Aux); i++ {
v1, ok1 := x.Aux[i].(string)
v2, ok2 := y.Aux[i].(string)
if !ok1 || !ok2 {
// Unsupported types used in Aux fields. Maybe they
// need to be added here?
return false
} else if v1 == v2 {
continue
}
return v1 > v2
}
}
return false // Times and/or Aux fields are equal.
}
func (h *booleanSortedMergeHeap) Push(x interface{}) {

View File

@ -393,6 +393,12 @@ func (itr *{{$k.name}}SortedMergeIterator) pop() (*{{$k.Name}}Point, error) {
}
// {{$k.name}}SortedMergeHeap represents a heap of {{$k.name}}SortedMergeHeapItems.
// Items are sorted with the following priority:
// - By their measurement name;
// - By their tag keys/values;
// - By time; or
// - By their Aux field values.
//
type {{$k.name}}SortedMergeHeap struct {
opt IteratorOptions
items []*{{$k.name}}SortedMergeHeapItem
@ -409,15 +415,53 @@ func (h *{{$k.name}}SortedMergeHeap) Less(i, j int) bool {
} else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); !xTags.Equals(&yTags) {
return xTags.ID() < yTags.ID()
}
return x.Time < y.Time
if x.Time != y.Time{
return x.Time < y.Time
}
if len(x.Aux) > 0 && len(x.Aux) == len(y.Aux) {
for i := 0; i < len(x.Aux); i++ {
v1, ok1 := x.Aux[i].(string)
v2, ok2 := y.Aux[i].(string)
if !ok1 || !ok2 {
// Unsupported types used in Aux fields. Maybe they
// need to be added here?
return false
} else if v1 == v2 {
continue
}
return v1 < v2
}
}
return false // Times and/or Aux fields are equal.
}
if x.Name != y.Name {
return x.Name > y.Name
} else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); !xTags.Equals(&yTags) {
} else if xTags, yTags := x.Tags.Subset(h.opt.Dimensions), y.Tags.Subset(h.opt.Dimensions); !xTags.Equals(&yTags) {
return xTags.ID() > yTags.ID()
}
return x.Time > y.Time
if x.Time != y.Time{
return x.Time > y.Time
}
if len(x.Aux) > 0 && len(x.Aux) == len(y.Aux) {
for i := 0; i < len(x.Aux); i++ {
v1, ok1 := x.Aux[i].(string)
v2, ok2 := y.Aux[i].(string)
if !ok1 || !ok2 {
// Unsupported types used in Aux fields. Maybe they
// need to be added here?
return false
} else if v1 == v2 {
continue
}
return v1 > v2
}
}
return false // Times and/or Aux fields are equal.
}
func (h *{{$k.name}}SortedMergeHeap) Push(x interface{}) {

View File

@ -6992,6 +6992,7 @@ func TestServer_Query_ShowQueries_Future(t *testing.T) {
func TestServer_Query_ShowSeries(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())
defer s.Close()
@ -7003,8 +7004,8 @@ func TestServer_Query_ShowSeries(t *testing.T) {
fmt.Sprintf(`cpu,host=server01 value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:01Z").UnixNano()),
fmt.Sprintf(`cpu,host=server01,region=uswest value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:02Z").UnixNano()),
fmt.Sprintf(`cpu,host=server01,region=useast value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:03Z").UnixNano()),
fmt.Sprintf(`cpu,host=server02,region=useast value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:04Z").UnixNano()),
fmt.Sprintf(`gpu,host=server02,region=useast value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:05Z").UnixNano()),
fmt.Sprintf(`cpu,host=server02,region=useast value=100 %d`, mustParseTime(time.RFC3339Nano, "2020-11-10T23:00:04Z").UnixNano()),
fmt.Sprintf(`gpu,host=server02,region=useast value=100 %d`, mustParseTime(time.RFC3339Nano, "2020-11-10T23:00:05Z").UnixNano()),
fmt.Sprintf(`gpu,host=server03,region=caeast value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:06Z").UnixNano()),
fmt.Sprintf(`disk,host=server03,region=caeast value=100 %d`, mustParseTime(time.RFC3339Nano, "2009-11-10T23:00:07Z").UnixNano()),
}