Merge pull request #13723 from influxdata/sgc/fix/merge
Ensure GroupCursor Keys is union of keys from all GroupCursors of current partition keypull/13732/head
commit
f56b4ef020
|
@ -4,6 +4,7 @@ import (
|
|||
"bytes"
|
||||
"sort"
|
||||
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/pkg/slices"
|
||||
)
|
||||
|
||||
|
@ -97,6 +98,7 @@ type groupByMergedGroupResultSet struct {
|
|||
nilVal []byte
|
||||
err error
|
||||
|
||||
km models.TagKeysSet
|
||||
gc groupByMergedGroupCursor
|
||||
}
|
||||
|
||||
|
@ -183,7 +185,13 @@ func (r *groupByMergedGroupResultSet) Next() GroupCursor {
|
|||
|
||||
r.gc.first = true
|
||||
r.gc.heap.init(r.resultSets)
|
||||
r.gc.keys = r.groupCursors[0].Keys()
|
||||
|
||||
r.km.Clear()
|
||||
for i := range r.groupCursors {
|
||||
r.km.UnionBytes(r.groupCursors[i].Keys())
|
||||
}
|
||||
|
||||
r.gc.keys = append(r.gc.keys[:0], r.km.KeysBytes()...)
|
||||
r.gc.vals = r.groupCursors[0].PartitionKeyVals()
|
||||
return &r.gc
|
||||
}
|
||||
|
|
|
@ -225,6 +225,24 @@ group:
|
|||
},
|
||||
exp: exp,
|
||||
},
|
||||
{
|
||||
name: "does merge keys",
|
||||
streams: []*sliceStreamReader{
|
||||
newStreamReader(
|
||||
groupByF("_m,tag1", "val00,<nil>", "aaa,tag0=val00", "cpu,tag0=val00,tag1=val11"),
|
||||
groupByF("_m,tag2", "<nil>,val20", "mem,tag1=val10,tag2=val20"),
|
||||
groupByF("_m,tag1,tag2", "<nil>,val21", "mem,tag1=val11,tag2=val21"),
|
||||
),
|
||||
newStreamReader(
|
||||
groupByF("_m,tag0,tag1", "val00,<nil>", "cpu,tag0=val00,tag1=val10", "cpu,tag0=val00,tag1=val12"),
|
||||
groupByF("_m,tag0", "val01,<nil>", "aaa,tag0=val01"),
|
||||
),
|
||||
newStreamReader(
|
||||
groupByF("_m,tag1", "<nil>,val20", "mem,tag1=val11,tag2=val20"),
|
||||
),
|
||||
},
|
||||
exp: exp,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
|
|
Loading…
Reference in New Issue