fix(storage): Fix panic when read request results in empty an group

Moves the check to determine if a series has data for the current time
range into the groupBySort function, which is consistent with the
groupNoneSort function.
pull/10616/head
Stuart Carnie 2018-12-12 13:16:29 -07:00
parent cae010be69
commit f2891c3f59
No known key found for this signature in database
GPG Key ID: 848D9C9718D78B4F
1 changed files with 20 additions and 26 deletions

View File

@ -205,8 +205,8 @@ func groupNoneSort(g *groupResultSet) (int, error) {
n := 0
row := cur.Next()
for row != nil {
n++
if allTime || g.seriesHasPoints(row) {
n++
g.km.mergeTagKeys(row.Tags)
}
row = cur.Next()
@ -217,22 +217,16 @@ func groupNoneSort(g *groupResultSet) (int, error) {
}
func groupByNextGroup(g *groupResultSet) GroupCursor {
next:
row := g.rows[g.i]
for i := range g.keys {
g.rgc.vals[i] = row.Tags.Get(g.keys[i])
}
g.km.clear()
allTime := g.req.Hints.HintSchemaAllTime()
c := 0
rowKey := row.SortKey
j := g.i
for j < len(g.rows) && bytes.Equal(rowKey, g.rows[j].SortKey) {
if allTime || g.seriesHasPoints(g.rows[j]) {
g.km.mergeTagKeys(g.rows[j].Tags)
c++
}
g.km.mergeTagKeys(g.rows[j].Tags)
j++
}
@ -242,9 +236,6 @@ next:
g.i = j
if j == len(g.rows) {
g.eof = true
} else if c == 0 {
// no rows with points
goto next
}
return &g.rgc
@ -261,28 +252,31 @@ func groupBySort(g *groupResultSet) (int, error) {
var rows []*SeriesRow
vals := make([][]byte, len(g.keys))
tagsBuf := &tagsBuffer{sz: 4096}
allTime := g.req.Hints.HintSchemaAllTime()
row := cur.Next()
for row != nil {
nr := *row
nr.SeriesTags = tagsBuf.copyTags(nr.SeriesTags)
nr.Tags = tagsBuf.copyTags(nr.Tags)
if allTime || g.seriesHasPoints(row) {
nr := *row
nr.SeriesTags = tagsBuf.copyTags(nr.SeriesTags)
nr.Tags = tagsBuf.copyTags(nr.Tags)
l := 0
for i, k := range g.keys {
vals[i] = nr.Tags.Get(k)
if len(vals[i]) == 0 {
vals[i] = g.nilSort
l := 0
for i, k := range g.keys {
vals[i] = nr.Tags.Get(k)
if len(vals[i]) == 0 {
vals[i] = g.nilSort
}
l += len(vals[i])
}
l += len(vals[i])
}
nr.SortKey = make([]byte, 0, l)
for _, v := range vals {
nr.SortKey = append(nr.SortKey, v...)
}
nr.SortKey = make([]byte, 0, l)
for _, v := range vals {
nr.SortKey = append(nr.SortKey, v...)
}
rows = append(rows, &nr)
rows = append(rows, &nr)
}
row = cur.Next()
}